cengal.parallel_execution.coroutines.coro_standard_services.event_bus.versions.v_0.event_bus
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19__all__ = [ 20 'EventBus', 'EventBusRequest', 'EventID', 'Handler' 21] 22 23 24from enum import Enum 25from typing import Dict, Set, List, Tuple, Union, Type, Optional, Any 26from cengal.parallel_execution.coroutines.coro_scheduler import * 27from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import * 28from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import * 29from cengal.time_management.repeat_for_a_time import Tracer 30from cengal.code_flow_control.smart_values.versions.v_1 import ValueExistence 31 32 33""" 34Module Docstring 35Docstrings: http://www.python.org/dev/peps/pep-0257/ 36""" 37 38__author__ = "ButenkoMS <gtalk@butenkoms.space>" 39__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 40__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 41__license__ = "Apache License, Version 2.0" 42__version__ = "4.4.1" 43__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 44__email__ = "gtalk@butenkoms.space" 45# __status__ = "Prototype" 46__status__ = "Development" 47# __status__ = "Production" 48 49 50EventID = str 51Handler = Callable[[EventID, Any], None] 52 53 54class EventBusRequest(ServiceRequest): 55 def register_handler(self, event: EventID, handler: Handler) -> None: 56 return self._save(0, event, handler) 57 58 def remove_handler(self, event: EventID, handler: Handler) -> None: 59 return self._save(1, event, handler) 60 61 def send_event(self, event: EventID, data: Any) -> None: 62 return self._save(2, event, data) 63 64 def set_priority(self, priority: CoroPriority) -> None: 65 return self._save(3, priority) 66 67 68class EventBus(Service, EntityStatsMixin): 69 def __init__(self, loop: CoroSchedulerType): 70 super(EventBus, self).__init__(loop) 71 72 self._request_workers = { 73 0: self._on_register_handler, 74 1: self._on_remove_handler, 75 2: self._on_send_event, 76 3: self._on_set_priority, 77 } 78 79 self.handlers: Dict[EventID, Set[Handler]] = dict() 80 self.events: Dict[EventID, List[Any]] = dict() 81 self.priority: CoroPriority = CoroPriority.low 82 self._in_processing: bool = False 83 84 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 85 if EntityStatsMixin.StatsLevel.info == stats_level: 86 func_info = full_func_info_to_printable_dict 87 else: 88 func_info = full_func_info_to_dict 89 90 handlers_set: Set = set() 91 for handlers in self.handlers.values(): 92 handlers_set += handlers 93 94 return type(self).__name__, { 95 'priority': str(self.priority), 96 'events': { 97 'num': len(self.events), 98 'list': set(self.events) 99 }, 100 'handlers': { 101 'num': len(handlers_set), 102 'list': [func_info(handler) for handler in handlers_set] 103 } 104 } 105 106 def single_task_registration_or_immediate_processing(self, request: Optional[EventBusRequest]=None 107 ) -> Tuple[bool, Any, None]: 108 if request is not None: 109 return self.resolve_request(request) 110 return True, None, None 111 112 def full_processing_iteration(self): 113 if self._in_processing: 114 return 115 116 handlers_buff = self.handlers 117 self.handlers = type(handlers_buff)() 118 119 events_buff = self.events 120 self.events = type(events_buff)() 121 122 interested_events: Set[EventID] = handlers_buff.keys() & events_buff.keys() 123 if interested_events: 124 async def event_processing_coro(interface: Interface, 125 interested_events: Set[EventID], 126 events: Dict[EventID, List[Any]], 127 handlers: Dict[EventID, Set[Handler]], 128 priority: CoroPriority 129 ): 130 loop: CoroSchedulerType = interface._loop 131 current_coro_interface_buff: Interface = loop.current_coro_interface 132 ly = await agly(priority) 133 for event in interested_events: 134 event_data_list = events[event] 135 for data in event_data_list: 136 event_handlers = handlers[event] 137 for handler in event_handlers: 138 try: 139 loop.current_coro_interface = None 140 handler(event, data) 141 except: 142 loop.logger.exception('EventBus. Event handler error') 143 finally: 144 loop.current_coro_interface = current_coro_interface_buff 145 146 await ly() 147 148 coro: CoroWrapperBase = self._loop.put_coro(event_processing_coro, 149 interested_events, events_buff, handlers_buff, 150 self.priority) 151 # coro.is_background_coro = True # must not be background coro: it is not an endless coro 152 coro.add_on_coro_del_handler(self._on_coro_del_handler) 153 self._in_processing = True 154 155 self.make_dead() 156 157 def _on_register_handler(self, event: EventID, handler: Handler): 158 if event not in self.handlers: 159 self.handlers[event] = set() 160 161 self.handlers[event].add(handler) 162 self.make_live() 163 return True, None, None 164 165 def _on_remove_handler(self, event: EventID, handler: Handler): 166 if event in self.handlers: 167 self.handlers[event].remove(handler) 168 169 return True, None, None 170 171 def _on_send_event(self, event: EventID, data: Any): 172 if event not in self.events: 173 self.events[event] = list() 174 175 self.events[event].append(data) 176 177 self.make_live() 178 return True, None, None 179 180 def _on_set_priority(self, priority: CoroPriority): 181 self.priority = priority 182 183 return True, None, None 184 185 def in_work(self): 186 result = (not self._in_processing) & bool(self.handlers) & bool(self.events) 187 return self.thrifty_in_work(result) 188 189 def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool: 190 self._in_processing = False 191 if bool(self.handlers) & bool(self.events): 192 self.make_live() 193 194 return False 195 196 197EventBusRequest.default_service_type = EventBus 198 199 200def try_send_event( 201 backup_scheduler: Optional[CoroSchedulerType], 202 event: EventID, data: Any): 203 def event_sender( 204 interface: Interface, 205 event: EventID, data: Any): 206 interface(EventBus, EventBusRequest().send_event(event, data)) 207 208 try_put_coro_to(get_interface_and_loop_with_explicit_loop(backup_scheduler), CoroType.auto, event_sender, event, data)
class
EventBus(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin):
69class EventBus(Service, EntityStatsMixin): 70 def __init__(self, loop: CoroSchedulerType): 71 super(EventBus, self).__init__(loop) 72 73 self._request_workers = { 74 0: self._on_register_handler, 75 1: self._on_remove_handler, 76 2: self._on_send_event, 77 3: self._on_set_priority, 78 } 79 80 self.handlers: Dict[EventID, Set[Handler]] = dict() 81 self.events: Dict[EventID, List[Any]] = dict() 82 self.priority: CoroPriority = CoroPriority.low 83 self._in_processing: bool = False 84 85 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 86 if EntityStatsMixin.StatsLevel.info == stats_level: 87 func_info = full_func_info_to_printable_dict 88 else: 89 func_info = full_func_info_to_dict 90 91 handlers_set: Set = set() 92 for handlers in self.handlers.values(): 93 handlers_set += handlers 94 95 return type(self).__name__, { 96 'priority': str(self.priority), 97 'events': { 98 'num': len(self.events), 99 'list': set(self.events) 100 }, 101 'handlers': { 102 'num': len(handlers_set), 103 'list': [func_info(handler) for handler in handlers_set] 104 } 105 } 106 107 def single_task_registration_or_immediate_processing(self, request: Optional[EventBusRequest]=None 108 ) -> Tuple[bool, Any, None]: 109 if request is not None: 110 return self.resolve_request(request) 111 return True, None, None 112 113 def full_processing_iteration(self): 114 if self._in_processing: 115 return 116 117 handlers_buff = self.handlers 118 self.handlers = type(handlers_buff)() 119 120 events_buff = self.events 121 self.events = type(events_buff)() 122 123 interested_events: Set[EventID] = handlers_buff.keys() & events_buff.keys() 124 if interested_events: 125 async def event_processing_coro(interface: Interface, 126 interested_events: Set[EventID], 127 events: Dict[EventID, List[Any]], 128 handlers: Dict[EventID, Set[Handler]], 129 priority: CoroPriority 130 ): 131 loop: CoroSchedulerType = interface._loop 132 current_coro_interface_buff: Interface = loop.current_coro_interface 133 ly = await agly(priority) 134 for event in interested_events: 135 event_data_list = events[event] 136 for data in event_data_list: 137 event_handlers = handlers[event] 138 for handler in event_handlers: 139 try: 140 loop.current_coro_interface = None 141 handler(event, data) 142 except: 143 loop.logger.exception('EventBus. Event handler error') 144 finally: 145 loop.current_coro_interface = current_coro_interface_buff 146 147 await ly() 148 149 coro: CoroWrapperBase = self._loop.put_coro(event_processing_coro, 150 interested_events, events_buff, handlers_buff, 151 self.priority) 152 # coro.is_background_coro = True # must not be background coro: it is not an endless coro 153 coro.add_on_coro_del_handler(self._on_coro_del_handler) 154 self._in_processing = True 155 156 self.make_dead() 157 158 def _on_register_handler(self, event: EventID, handler: Handler): 159 if event not in self.handlers: 160 self.handlers[event] = set() 161 162 self.handlers[event].add(handler) 163 self.make_live() 164 return True, None, None 165 166 def _on_remove_handler(self, event: EventID, handler: Handler): 167 if event in self.handlers: 168 self.handlers[event].remove(handler) 169 170 return True, None, None 171 172 def _on_send_event(self, event: EventID, data: Any): 173 if event not in self.events: 174 self.events[event] = list() 175 176 self.events[event].append(data) 177 178 self.make_live() 179 return True, None, None 180 181 def _on_set_priority(self, priority: CoroPriority): 182 self.priority = priority 183 184 return True, None, None 185 186 def in_work(self): 187 result = (not self._in_processing) & bool(self.handlers) & bool(self.events) 188 return self.thrifty_in_work(result) 189 190 def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool: 191 self._in_processing = False 192 if bool(self.handlers) & bool(self.events): 193 self.make_live() 194 195 return False
EventBus( loop: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
70 def __init__(self, loop: CoroSchedulerType): 71 super(EventBus, self).__init__(loop) 72 73 self._request_workers = { 74 0: self._on_register_handler, 75 1: self._on_remove_handler, 76 2: self._on_send_event, 77 3: self._on_set_priority, 78 } 79 80 self.handlers: Dict[EventID, Set[Handler]] = dict() 81 self.events: Dict[EventID, List[Any]] = dict() 82 self.priority: CoroPriority = CoroPriority.low 83 self._in_processing: bool = False
priority: cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield.CoroPriority
def
get_entity_stats( self, stats_level: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin.StatsLevel = <StatsLevel.debug: 1>) -> Tuple[str, Dict[str, Any]]:
85 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 86 if EntityStatsMixin.StatsLevel.info == stats_level: 87 func_info = full_func_info_to_printable_dict 88 else: 89 func_info = full_func_info_to_dict 90 91 handlers_set: Set = set() 92 for handlers in self.handlers.values(): 93 handlers_set += handlers 94 95 return type(self).__name__, { 96 'priority': str(self.priority), 97 'events': { 98 'num': len(self.events), 99 'list': set(self.events) 100 }, 101 'handlers': { 102 'num': len(handlers_set), 103 'list': [func_info(handler) for handler in handlers_set] 104 } 105 }
def
single_task_registration_or_immediate_processing( self, request: Union[EventBusRequest, NoneType] = None) -> Tuple[bool, Any, NoneType]:
def
full_processing_iteration(self):
113 def full_processing_iteration(self): 114 if self._in_processing: 115 return 116 117 handlers_buff = self.handlers 118 self.handlers = type(handlers_buff)() 119 120 events_buff = self.events 121 self.events = type(events_buff)() 122 123 interested_events: Set[EventID] = handlers_buff.keys() & events_buff.keys() 124 if interested_events: 125 async def event_processing_coro(interface: Interface, 126 interested_events: Set[EventID], 127 events: Dict[EventID, List[Any]], 128 handlers: Dict[EventID, Set[Handler]], 129 priority: CoroPriority 130 ): 131 loop: CoroSchedulerType = interface._loop 132 current_coro_interface_buff: Interface = loop.current_coro_interface 133 ly = await agly(priority) 134 for event in interested_events: 135 event_data_list = events[event] 136 for data in event_data_list: 137 event_handlers = handlers[event] 138 for handler in event_handlers: 139 try: 140 loop.current_coro_interface = None 141 handler(event, data) 142 except: 143 loop.logger.exception('EventBus. Event handler error') 144 finally: 145 loop.current_coro_interface = current_coro_interface_buff 146 147 await ly() 148 149 coro: CoroWrapperBase = self._loop.put_coro(event_processing_coro, 150 interested_events, events_buff, handlers_buff, 151 self.priority) 152 # coro.is_background_coro = True # must not be background coro: it is not an endless coro 153 coro.add_on_coro_del_handler(self._on_coro_del_handler) 154 self._in_processing = True 155 156 self.make_dead()
def
in_work(self):
186 def in_work(self): 187 result = (not self._in_processing) & bool(self.handlers) & bool(self.events) 188 return self.thrifty_in_work(result)
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- time_left_before_next_event
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
- destroy
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
- StatsLevel
class
EventBusRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
55class EventBusRequest(ServiceRequest): 56 def register_handler(self, event: EventID, handler: Handler) -> None: 57 return self._save(0, event, handler) 58 59 def remove_handler(self, event: EventID, handler: Handler) -> None: 60 return self._save(1, event, handler) 61 62 def send_event(self, event: EventID, data: Any) -> None: 63 return self._save(2, event, data) 64 65 def set_priority(self, priority: CoroPriority) -> None: 66 return self._save(3, priority)
def
set_priority( self, priority: cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield.CoroPriority) -> None:
default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] =
<class 'EventBus'>
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
- default__request__type__
- request_type
- args
- kwargs
- provide_to_request_handler
- interface
- i
- async_interface
- ai
EventID =
<class 'str'>
Handler =
typing.Callable[[str, typing.Any], NoneType]