cengal.parallel_execution.coroutines.coro_standard_services.event_bus.versions.v_0.event_bus

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19__all__ = [
 20    'EventBus', 'EventBusRequest', 'EventID', 'Handler'
 21]
 22
 23
 24from enum import Enum
 25from typing import Dict, Set, List, Tuple, Union, Type, Optional, Any
 26from cengal.parallel_execution.coroutines.coro_scheduler import *
 27from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import *
 28from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import *
 29from cengal.time_management.repeat_for_a_time import Tracer
 30from cengal.code_flow_control.smart_values.versions.v_1 import ValueExistence
 31
 32
 33"""
 34Module Docstring
 35Docstrings: http://www.python.org/dev/peps/pep-0257/
 36"""
 37
 38__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 39__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 40__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 41__license__ = "Apache License, Version 2.0"
 42__version__ = "4.4.1"
 43__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 44__email__ = "gtalk@butenkoms.space"
 45# __status__ = "Prototype"
 46__status__ = "Development"
 47# __status__ = "Production"
 48
 49
 50EventID = str
 51Handler = Callable[[EventID, Any], None]
 52
 53
 54class EventBusRequest(ServiceRequest):
 55    def register_handler(self, event: EventID, handler: Handler) -> None:
 56        return self._save(0, event, handler)
 57
 58    def remove_handler(self, event: EventID, handler: Handler) -> None:
 59        return self._save(1, event, handler)
 60
 61    def send_event(self, event: EventID, data: Any) -> None:
 62        return self._save(2, event, data)
 63
 64    def set_priority(self, priority: CoroPriority) -> None:
 65        return self._save(3, priority)
 66
 67
 68class EventBus(Service, EntityStatsMixin):
 69    def __init__(self, loop: CoroSchedulerType):
 70        super(EventBus, self).__init__(loop)
 71
 72        self._request_workers = {
 73            0: self._on_register_handler,
 74            1: self._on_remove_handler,
 75            2: self._on_send_event,
 76            3: self._on_set_priority,
 77        }
 78
 79        self.handlers: Dict[EventID, Set[Handler]] = dict()
 80        self.events: Dict[EventID, List[Any]] = dict()
 81        self.priority: CoroPriority = CoroPriority.low
 82        self._in_processing: bool = False
 83
 84    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
 85        if EntityStatsMixin.StatsLevel.info == stats_level:
 86            func_info = full_func_info_to_printable_dict
 87        else:
 88            func_info = full_func_info_to_dict
 89
 90        handlers_set: Set = set()
 91        for handlers in self.handlers.values():
 92            handlers_set += handlers
 93            
 94        return type(self).__name__, {
 95            'priority': str(self.priority),
 96            'events': {
 97                'num': len(self.events),
 98                'list': set(self.events)
 99            },
100            'handlers': {
101                'num': len(handlers_set),
102                'list': [func_info(handler) for handler in handlers_set]
103            }
104        }
105
106    def single_task_registration_or_immediate_processing(self, request: Optional[EventBusRequest]=None
107                                                         ) -> Tuple[bool, Any, None]:
108        if request is not None:
109            return self.resolve_request(request)
110        return True, None, None
111
112    def full_processing_iteration(self):
113        if self._in_processing:
114            return
115
116        handlers_buff = self.handlers
117        self.handlers = type(handlers_buff)()
118
119        events_buff = self.events
120        self.events = type(events_buff)()
121
122        interested_events: Set[EventID] = handlers_buff.keys() & events_buff.keys()
123        if interested_events:
124            async def event_processing_coro(interface: Interface,
125                                    interested_events: Set[EventID],
126                                    events: Dict[EventID, List[Any]],
127                                    handlers: Dict[EventID, Set[Handler]],
128                                    priority: CoroPriority
129                                    ):
130                loop: CoroSchedulerType = interface._loop
131                current_coro_interface_buff: Interface = loop.current_coro_interface
132                ly = await agly(priority)
133                for event in interested_events:
134                    event_data_list = events[event]
135                    for data in event_data_list:
136                        event_handlers = handlers[event]
137                        for handler in event_handlers:
138                            try:
139                                loop.current_coro_interface = None
140                                handler(event, data)
141                            except:
142                                loop.logger.exception('EventBus. Event handler error')
143                            finally:
144                                loop.current_coro_interface = current_coro_interface_buff
145                            
146                            await ly()
147
148            coro: CoroWrapperBase = self._loop.put_coro(event_processing_coro,
149                                       interested_events, events_buff, handlers_buff,
150                                       self.priority)
151            # coro.is_background_coro = True  # must not be background coro: it is not an endless coro
152            coro.add_on_coro_del_handler(self._on_coro_del_handler)
153            self._in_processing = True
154        
155        self.make_dead()
156
157    def _on_register_handler(self, event: EventID, handler: Handler):
158        if event not in self.handlers:
159            self.handlers[event] = set()
160
161        self.handlers[event].add(handler)
162        self.make_live()
163        return True, None, None
164
165    def _on_remove_handler(self, event: EventID, handler: Handler):
166        if event in self.handlers:
167            self.handlers[event].remove(handler)
168
169        return True, None, None
170
171    def _on_send_event(self, event: EventID, data: Any):
172        if event not in self.events:
173            self.events[event] = list()
174
175            self.events[event].append(data)
176        
177        self.make_live()
178        return True, None, None
179
180    def _on_set_priority(self, priority: CoroPriority):
181        self.priority = priority
182
183        return True, None, None
184
185    def in_work(self):
186        result = (not self._in_processing) & bool(self.handlers) & bool(self.events)
187        return self.thrifty_in_work(result)
188
189    def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool:
190        self._in_processing = False
191        if bool(self.handlers) & bool(self.events):
192            self.make_live()
193        
194        return False
195
196
197EventBusRequest.default_service_type = EventBus
198
199
200def try_send_event(
201        backup_scheduler: Optional[CoroSchedulerType],
202        event: EventID, data: Any):
203    def event_sender(
204            interface: Interface,
205            event: EventID, data: Any):
206        interface(EventBus, EventBusRequest().send_event(event, data))
207
208    try_put_coro_to(get_interface_and_loop_with_explicit_loop(backup_scheduler), CoroType.auto, event_sender, event, data)
class EventBus(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin):
 69class EventBus(Service, EntityStatsMixin):
 70    def __init__(self, loop: CoroSchedulerType):
 71        super(EventBus, self).__init__(loop)
 72
 73        self._request_workers = {
 74            0: self._on_register_handler,
 75            1: self._on_remove_handler,
 76            2: self._on_send_event,
 77            3: self._on_set_priority,
 78        }
 79
 80        self.handlers: Dict[EventID, Set[Handler]] = dict()
 81        self.events: Dict[EventID, List[Any]] = dict()
 82        self.priority: CoroPriority = CoroPriority.low
 83        self._in_processing: bool = False
 84
 85    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
 86        if EntityStatsMixin.StatsLevel.info == stats_level:
 87            func_info = full_func_info_to_printable_dict
 88        else:
 89            func_info = full_func_info_to_dict
 90
 91        handlers_set: Set = set()
 92        for handlers in self.handlers.values():
 93            handlers_set += handlers
 94            
 95        return type(self).__name__, {
 96            'priority': str(self.priority),
 97            'events': {
 98                'num': len(self.events),
 99                'list': set(self.events)
100            },
101            'handlers': {
102                'num': len(handlers_set),
103                'list': [func_info(handler) for handler in handlers_set]
104            }
105        }
106
107    def single_task_registration_or_immediate_processing(self, request: Optional[EventBusRequest]=None
108                                                         ) -> Tuple[bool, Any, None]:
109        if request is not None:
110            return self.resolve_request(request)
111        return True, None, None
112
113    def full_processing_iteration(self):
114        if self._in_processing:
115            return
116
117        handlers_buff = self.handlers
118        self.handlers = type(handlers_buff)()
119
120        events_buff = self.events
121        self.events = type(events_buff)()
122
123        interested_events: Set[EventID] = handlers_buff.keys() & events_buff.keys()
124        if interested_events:
125            async def event_processing_coro(interface: Interface,
126                                    interested_events: Set[EventID],
127                                    events: Dict[EventID, List[Any]],
128                                    handlers: Dict[EventID, Set[Handler]],
129                                    priority: CoroPriority
130                                    ):
131                loop: CoroSchedulerType = interface._loop
132                current_coro_interface_buff: Interface = loop.current_coro_interface
133                ly = await agly(priority)
134                for event in interested_events:
135                    event_data_list = events[event]
136                    for data in event_data_list:
137                        event_handlers = handlers[event]
138                        for handler in event_handlers:
139                            try:
140                                loop.current_coro_interface = None
141                                handler(event, data)
142                            except:
143                                loop.logger.exception('EventBus. Event handler error')
144                            finally:
145                                loop.current_coro_interface = current_coro_interface_buff
146                            
147                            await ly()
148
149            coro: CoroWrapperBase = self._loop.put_coro(event_processing_coro,
150                                       interested_events, events_buff, handlers_buff,
151                                       self.priority)
152            # coro.is_background_coro = True  # must not be background coro: it is not an endless coro
153            coro.add_on_coro_del_handler(self._on_coro_del_handler)
154            self._in_processing = True
155        
156        self.make_dead()
157
158    def _on_register_handler(self, event: EventID, handler: Handler):
159        if event not in self.handlers:
160            self.handlers[event] = set()
161
162        self.handlers[event].add(handler)
163        self.make_live()
164        return True, None, None
165
166    def _on_remove_handler(self, event: EventID, handler: Handler):
167        if event in self.handlers:
168            self.handlers[event].remove(handler)
169
170        return True, None, None
171
172    def _on_send_event(self, event: EventID, data: Any):
173        if event not in self.events:
174            self.events[event] = list()
175
176            self.events[event].append(data)
177        
178        self.make_live()
179        return True, None, None
180
181    def _on_set_priority(self, priority: CoroPriority):
182        self.priority = priority
183
184        return True, None, None
185
186    def in_work(self):
187        result = (not self._in_processing) & bool(self.handlers) & bool(self.events)
188        return self.thrifty_in_work(result)
189
190    def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool:
191        self._in_processing = False
192        if bool(self.handlers) & bool(self.events):
193            self.make_live()
194        
195        return False
EventBus( loop: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
70    def __init__(self, loop: CoroSchedulerType):
71        super(EventBus, self).__init__(loop)
72
73        self._request_workers = {
74            0: self._on_register_handler,
75            1: self._on_remove_handler,
76            2: self._on_send_event,
77            3: self._on_set_priority,
78        }
79
80        self.handlers: Dict[EventID, Set[Handler]] = dict()
81        self.events: Dict[EventID, List[Any]] = dict()
82        self.priority: CoroPriority = CoroPriority.low
83        self._in_processing: bool = False
handlers: Dict[str, Set[Callable[[str, Any], NoneType]]]
events: Dict[str, List[Any]]
priority: cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield.CoroPriority
def get_entity_stats( self, stats_level: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin.StatsLevel = <StatsLevel.debug: 1>) -> Tuple[str, Dict[str, Any]]:
 85    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
 86        if EntityStatsMixin.StatsLevel.info == stats_level:
 87            func_info = full_func_info_to_printable_dict
 88        else:
 89            func_info = full_func_info_to_dict
 90
 91        handlers_set: Set = set()
 92        for handlers in self.handlers.values():
 93            handlers_set += handlers
 94            
 95        return type(self).__name__, {
 96            'priority': str(self.priority),
 97            'events': {
 98                'num': len(self.events),
 99                'list': set(self.events)
100            },
101            'handlers': {
102                'num': len(handlers_set),
103                'list': [func_info(handler) for handler in handlers_set]
104            }
105        }
def single_task_registration_or_immediate_processing( self, request: Union[EventBusRequest, NoneType] = None) -> Tuple[bool, Any, NoneType]:
107    def single_task_registration_or_immediate_processing(self, request: Optional[EventBusRequest]=None
108                                                         ) -> Tuple[bool, Any, None]:
109        if request is not None:
110            return self.resolve_request(request)
111        return True, None, None
def full_processing_iteration(self):
113    def full_processing_iteration(self):
114        if self._in_processing:
115            return
116
117        handlers_buff = self.handlers
118        self.handlers = type(handlers_buff)()
119
120        events_buff = self.events
121        self.events = type(events_buff)()
122
123        interested_events: Set[EventID] = handlers_buff.keys() & events_buff.keys()
124        if interested_events:
125            async def event_processing_coro(interface: Interface,
126                                    interested_events: Set[EventID],
127                                    events: Dict[EventID, List[Any]],
128                                    handlers: Dict[EventID, Set[Handler]],
129                                    priority: CoroPriority
130                                    ):
131                loop: CoroSchedulerType = interface._loop
132                current_coro_interface_buff: Interface = loop.current_coro_interface
133                ly = await agly(priority)
134                for event in interested_events:
135                    event_data_list = events[event]
136                    for data in event_data_list:
137                        event_handlers = handlers[event]
138                        for handler in event_handlers:
139                            try:
140                                loop.current_coro_interface = None
141                                handler(event, data)
142                            except:
143                                loop.logger.exception('EventBus. Event handler error')
144                            finally:
145                                loop.current_coro_interface = current_coro_interface_buff
146                            
147                            await ly()
148
149            coro: CoroWrapperBase = self._loop.put_coro(event_processing_coro,
150                                       interested_events, events_buff, handlers_buff,
151                                       self.priority)
152            # coro.is_background_coro = True  # must not be background coro: it is not an endless coro
153            coro.add_on_coro_del_handler(self._on_coro_del_handler)
154            self._in_processing = True
155        
156        self.make_dead()
def in_work(self):
186    def in_work(self):
187        result = (not self._in_processing) & bool(self.handlers) & bool(self.events)
188        return self.thrifty_in_work(result)

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
in_forground_work
thrifty_in_work
time_left_before_next_event
is_low_latency
make_live
make_dead
service_id_impl
service_id
destroy
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
StatsLevel
class EventBusRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
55class EventBusRequest(ServiceRequest):
56    def register_handler(self, event: EventID, handler: Handler) -> None:
57        return self._save(0, event, handler)
58
59    def remove_handler(self, event: EventID, handler: Handler) -> None:
60        return self._save(1, event, handler)
61
62    def send_event(self, event: EventID, data: Any) -> None:
63        return self._save(2, event, data)
64
65    def set_priority(self, priority: CoroPriority) -> None:
66        return self._save(3, priority)
def register_handler(self, event: str, handler: Callable[[str, Any], NoneType]) -> None:
56    def register_handler(self, event: EventID, handler: Handler) -> None:
57        return self._save(0, event, handler)
def remove_handler(self, event: str, handler: Callable[[str, Any], NoneType]) -> None:
59    def remove_handler(self, event: EventID, handler: Handler) -> None:
60        return self._save(1, event, handler)
def send_event(self, event: str, data: Any) -> None:
62    def send_event(self, event: EventID, data: Any) -> None:
63        return self._save(2, event, data)
def set_priority( self, priority: cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield.CoroPriority) -> None:
65    def set_priority(self, priority: CoroPriority) -> None:
66        return self._save(3, priority)
default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] = <class 'EventBus'>
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
default__request__type__
request_type
args
kwargs
provide_to_request_handler
interface
i
async_interface
ai
EventID = <class 'str'>
Handler = typing.Callable[[str, typing.Any], NoneType]