cengal.parallel_execution.coroutines.coro_standard_services.async_event_bus.versions.v_0.async_event_bus

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19__all__ = [
 20    'AsyncEventBus', 'AsyncEventBusRequest', 'EventID', 'Handler', 'try_send_async_event'
 21]
 22
 23
 24import sys
 25from enum import Enum
 26from typing import Dict, Set, List, Tuple, Union, Type, Optional, Any, Deque
 27from cengal.parallel_execution.coroutines.coro_scheduler import *
 28from cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request import *
 29from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import *
 30from cengal.parallel_execution.coroutines.coro_standard_services.event_bus import EventID, Handler
 31from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import *
 32from cengal.code_flow_control.smart_values import ValueExistence
 33from cengal.introspection.inspect import get_exception
 34from cengal.time_management.repeat_for_a_time import Tracer
 35from collections import deque
 36
 37
 38"""
 39Module Docstring
 40Docstrings: http://www.python.org/dev/peps/pep-0257/
 41"""
 42
 43__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 44__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 45__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 46__license__ = "Apache License, Version 2.0"
 47__version__ = "4.4.1"
 48__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 49__email__ = "gtalk@butenkoms.space"
 50# __status__ = "Prototype"
 51__status__ = "Development"
 52# __status__ = "Production"
 53
 54
 55class AsyncEventBusRequest(ServiceRequest):
 56    def register_handler(self, event: EventID, handler: Handler) -> None:
 57        return self._save(0, event, handler)
 58
 59    def remove_handler(self, event: EventID, handler: Handler) -> None:
 60        return self._save(1, event, handler)
 61
 62    def send_event(self, event: EventID, data: Any,
 63                   priority: Optional[CoroPriority]=CoroPriority.low) -> None:
 64        return self._save(2, event, data, priority)
 65
 66    def get_event(self, event: EventID, default: Any = None) -> ValueExistence[Any]:
 67        return self._save(3, event, default)
 68
 69    def flush_event_data(self, event: EventID) -> ValueExistence[Any]:
 70        return self._save(5, event)
 71
 72    def wait(self, event: EventID) -> Any:
 73        """
 74        Will block coroutine untill result is ready
 75        :param event:
 76        :return: event data
 77        """
 78        return self._save(4, event)
 79
 80
 81class AsyncEventBus(ServiceWithADirectRequestMixin, Service, EntityStatsMixin):
 82    def __init__(self, loop: CoroSchedulerType):
 83        super(AsyncEventBus, self).__init__(loop)
 84
 85        self._request_workers = {
 86            0: self._on_register_handler,
 87            1: self._on_remove_handler,
 88            2: self._on_send_event,
 89            3: self._on_get_event,
 90            4: self._on_wait,
 91            5: self._on_flush_event_data,
 92        }
 93
 94        self.waiters: Dict[EventID, Set[CoroID]] = dict()
 95        self.events_by_waiter: Dict[CoroID, Set[Any]] = dict()
 96        self.handlers: Dict[EventID, Set[Handler]] = dict()
 97        self.events: Dict[CoroPriority, Dict[EventID, Deque[Any]]] = {
 98            CoroPriority.low: dict(),
 99            CoroPriority.normal: dict(),
100            CoroPriority.high: dict()
101        }
102        self.priority_sequence = [
103            CoroPriority.high,
104            CoroPriority.normal,
105            CoroPriority.low
106        ]
107
108    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
109        if EntityStatsMixin.StatsLevel.info == stats_level:
110            func_info = full_func_info_to_printable_dict
111        else:
112            func_info = full_func_info_to_dict
113
114        coroutines_set: Set = set()
115        for coroutines in self.waiters.values():
116            coroutines_set.update(coroutines)
117
118        handlers_set: Set = set()
119        for handlers in self.handlers.values():
120            handlers_set.update(handlers)
121            
122        return type(self).__name__, {
123            'events': {
124                'low priority': {
125                    'num': len(self.events[CoroPriority.low]),
126                    'list': set(self.events[CoroPriority.low])
127                },
128                'normal priority': {
129                    'num': len(self.events[CoroPriority.normal]),
130                    'list': set(self.events[CoroPriority.normal])
131                },
132                'high priority': {
133                    'num': len(self.events[CoroPriority.high]),
134                    'list': set(self.events[CoroPriority.high])
135                },
136            },
137            'waiting coroutines': {
138                'num': len(coroutines_set),
139                'list': coroutines_set,
140            },
141            'handlers': {
142                'num': len(handlers_set),
143                'list': [func_info(handler) for handler in handlers_set]
144            }
145        }
146
147    def single_task_registration_or_immediate_processing(self, request: Optional[AsyncEventBusRequest]=None
148                                                         ) -> Tuple[bool, Any, None]:
149        if request is not None:
150            return self.resolve_request(request)
151        return True, None, None
152
153    def full_processing_iteration(self):
154        handlers = self.handlers
155        waiters = self.waiters
156        for priority in self.priority_sequence:
157            priority_events = self.events[priority]
158            if not priority_events:
159                continue
160            else:
161                interested_events = waiters.keys() & priority_events.keys()
162                for event in interested_events:
163                    event_data_list: Deque = priority_events[event]
164                    for i in range(len(event_data_list)):
165                        event_waiters = waiters.pop(event, None)
166                        if event_waiters is None:
167                            break
168                        else:
169                            for waiter in event_waiters:
170                                waiter_events = self.events_by_waiter.get(waiter, None)
171                                if waiter_events:
172                                    if event in waiter_events:
173                                        waiter_events.remove(event)
174                                        self.register_response(waiter, event_data_list.popleft())
175                
176                    if not event_data_list:
177                        del priority_events[event]
178
179                interested_events = handlers.keys() & priority_events.keys()
180            
181            if interested_events:
182                processed_priority_events = set()
183                handlers_info_list = list()
184                for event in interested_events:
185                    event_data_list = priority_events[event]
186                    processed_priority_events.add(event)
187                    for data in event_data_list:
188                        event_handlers = handlers.get(event, None)
189                        if event_handlers:
190                            for handler in event_handlers:
191                                handlers_info_list.append((handler, event, data))
192
193                async def event_processing_coro(
194                    interface: Interface,
195                    handlers_info_list: List[Tuple[(Handler, EventID, Any)]],
196                    priority: CoroPriority,
197                ):
198                    loop: CoroSchedulerType = interface._loop
199                    current_coro_interface_buff: Interface = loop.current_coro_interface
200                    ly = await agly(priority)
201                    for handler, event, data in handlers_info_list:
202                        try:
203                            loop.current_coro_interface = None
204                            handler(event, data)
205                        except:
206                            loop.logger.exception('AsyncEventBus. Event handler error')
207                        finally:
208                            loop.current_coro_interface = current_coro_interface_buff
209
210                        await ly()
211
212                try:
213                    coro: CoroWrapperBase = self._loop.put_coro(
214                        event_processing_coro,
215                        handlers_info_list,
216                        priority,
217                    )
218                    # coro.is_background_coro = True  # must not be background coro: it is not an endless coro
219                except:
220                    ex_type, exception, tracback = sys.exc_info()
221                    raise
222                finally:
223                    for processed_event in processed_priority_events:
224                        del priority_events[processed_event]
225
226        self.make_dead()
227
228    def _on_register_handler(self, event: EventID, handler: Handler):
229        if event not in self.handlers:
230            self.handlers[event] = set()
231
232        self.handlers[event].add(handler)
233        self.make_live()
234        return True, None, None
235
236    def _on_remove_handler(self, event: EventID, handler: Handler):
237        if event in self.handlers:
238            self.handlers[event].discard(handler)
239            if not self.handlers[event]:
240                del self.handlers[event]
241
242        return True, None, None
243
244    def _on_send_event(self, event: EventID, data: Any, priority: Optional[CoroPriority]=CoroPriority.low):
245        events_of_priority = self.events[priority]
246        if event not in events_of_priority:
247            events_of_priority[event] = deque()
248
249        events_of_priority[event].append(data)
250        self.make_live()
251
252        return True, None, None
253
254    def _add_direct_request(self, event: EventID, data: Any, priority: Optional[CoroPriority]=CoroPriority.low) -> ValueExistence:
255        self._on_send_event(event, data, priority)
256        return (True, None)
257
258    def _on_get_event(self, event: EventID, default: Any = None) -> Tuple[bool, ValueExistence, None]:
259        for _, events in self.events.items():
260            if event in events:
261                return (True, events[event].popleft(), None)
262        
263        return (True, (False, default), None)
264
265    def _on_flush_event_data(self, event: EventID):
266        for _, events in self.events.items():
267            if event in events:
268                del events[event]
269                return (True, True, None)
270        
271        return (True, False, None)
272
273    def _on_wait(self, event: EventID):
274        requester_id = self.current_caller_coro_info.coro_id
275        requester = self.current_caller_coro_info.coro
276        requester.add_on_coro_del_handler(self._on_coro_del_handler)
277        if event not in self.waiters:
278            self.waiters[event] = set()
279
280        if requester_id not in self.events_by_waiter:
281            self.events_by_waiter[requester_id] = set()
282
283        self.waiters[event].add(requester_id)
284        self.events_by_waiter[requester_id].add(event)
285        return (False, None, None)
286
287    def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool:
288        coro_id = coro.coro_id
289        if coro_id in self.events_by_waiter:
290            events = self.events_by_waiter[coro_id]
291            for event in events:
292                if event in self.waiters:
293                    if coro_id in self.waiters[event]:
294                        self.waiters[event].remove(coro_id)
295
296            del self.events_by_waiter[coro_id]
297        
298        return False
299
300    def in_work(self):
301        result: bool = bool(self.waiters) | bool(self.handlers)
302        for priority in self.priority_sequence:
303            result |= bool(self.events[priority])
304        
305        return self.thrifty_in_work(result)
306
307
308AsyncEventBusRequest.default_service_type = AsyncEventBus
309
310
311def try_send_async_event(
312        backup_scheduler: Optional[CoroSchedulerType],
313        event: EventID, data: Any, priority: Optional[CoroPriority]=CoroPriority.low):
314    def event_sender(
315            interface: Interface,
316            event: EventID, data: Any, priority: Optional[CoroPriority]=CoroPriority.low):
317        interface(AsyncEventBus, AsyncEventBusRequest().send_event(event, data, priority))
318
319    try_put_coro_to(get_interface_and_loop_with_explicit_loop(backup_scheduler), event_sender, event, data, priority)
class AsyncEventBus(cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request.versions.v_0.service_with_a_direct_request.ServiceWithADirectRequestMixin, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin):
 82class AsyncEventBus(ServiceWithADirectRequestMixin, Service, EntityStatsMixin):
 83    def __init__(self, loop: CoroSchedulerType):
 84        super(AsyncEventBus, self).__init__(loop)
 85
 86        self._request_workers = {
 87            0: self._on_register_handler,
 88            1: self._on_remove_handler,
 89            2: self._on_send_event,
 90            3: self._on_get_event,
 91            4: self._on_wait,
 92            5: self._on_flush_event_data,
 93        }
 94
 95        self.waiters: Dict[EventID, Set[CoroID]] = dict()
 96        self.events_by_waiter: Dict[CoroID, Set[Any]] = dict()
 97        self.handlers: Dict[EventID, Set[Handler]] = dict()
 98        self.events: Dict[CoroPriority, Dict[EventID, Deque[Any]]] = {
 99            CoroPriority.low: dict(),
100            CoroPriority.normal: dict(),
101            CoroPriority.high: dict()
102        }
103        self.priority_sequence = [
104            CoroPriority.high,
105            CoroPriority.normal,
106            CoroPriority.low
107        ]
108
109    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
110        if EntityStatsMixin.StatsLevel.info == stats_level:
111            func_info = full_func_info_to_printable_dict
112        else:
113            func_info = full_func_info_to_dict
114
115        coroutines_set: Set = set()
116        for coroutines in self.waiters.values():
117            coroutines_set.update(coroutines)
118
119        handlers_set: Set = set()
120        for handlers in self.handlers.values():
121            handlers_set.update(handlers)
122            
123        return type(self).__name__, {
124            'events': {
125                'low priority': {
126                    'num': len(self.events[CoroPriority.low]),
127                    'list': set(self.events[CoroPriority.low])
128                },
129                'normal priority': {
130                    'num': len(self.events[CoroPriority.normal]),
131                    'list': set(self.events[CoroPriority.normal])
132                },
133                'high priority': {
134                    'num': len(self.events[CoroPriority.high]),
135                    'list': set(self.events[CoroPriority.high])
136                },
137            },
138            'waiting coroutines': {
139                'num': len(coroutines_set),
140                'list': coroutines_set,
141            },
142            'handlers': {
143                'num': len(handlers_set),
144                'list': [func_info(handler) for handler in handlers_set]
145            }
146        }
147
148    def single_task_registration_or_immediate_processing(self, request: Optional[AsyncEventBusRequest]=None
149                                                         ) -> Tuple[bool, Any, None]:
150        if request is not None:
151            return self.resolve_request(request)
152        return True, None, None
153
154    def full_processing_iteration(self):
155        handlers = self.handlers
156        waiters = self.waiters
157        for priority in self.priority_sequence:
158            priority_events = self.events[priority]
159            if not priority_events:
160                continue
161            else:
162                interested_events = waiters.keys() & priority_events.keys()
163                for event in interested_events:
164                    event_data_list: Deque = priority_events[event]
165                    for i in range(len(event_data_list)):
166                        event_waiters = waiters.pop(event, None)
167                        if event_waiters is None:
168                            break
169                        else:
170                            for waiter in event_waiters:
171                                waiter_events = self.events_by_waiter.get(waiter, None)
172                                if waiter_events:
173                                    if event in waiter_events:
174                                        waiter_events.remove(event)
175                                        self.register_response(waiter, event_data_list.popleft())
176                
177                    if not event_data_list:
178                        del priority_events[event]
179
180                interested_events = handlers.keys() & priority_events.keys()
181            
182            if interested_events:
183                processed_priority_events = set()
184                handlers_info_list = list()
185                for event in interested_events:
186                    event_data_list = priority_events[event]
187                    processed_priority_events.add(event)
188                    for data in event_data_list:
189                        event_handlers = handlers.get(event, None)
190                        if event_handlers:
191                            for handler in event_handlers:
192                                handlers_info_list.append((handler, event, data))
193
194                async def event_processing_coro(
195                    interface: Interface,
196                    handlers_info_list: List[Tuple[(Handler, EventID, Any)]],
197                    priority: CoroPriority,
198                ):
199                    loop: CoroSchedulerType = interface._loop
200                    current_coro_interface_buff: Interface = loop.current_coro_interface
201                    ly = await agly(priority)
202                    for handler, event, data in handlers_info_list:
203                        try:
204                            loop.current_coro_interface = None
205                            handler(event, data)
206                        except:
207                            loop.logger.exception('AsyncEventBus. Event handler error')
208                        finally:
209                            loop.current_coro_interface = current_coro_interface_buff
210
211                        await ly()
212
213                try:
214                    coro: CoroWrapperBase = self._loop.put_coro(
215                        event_processing_coro,
216                        handlers_info_list,
217                        priority,
218                    )
219                    # coro.is_background_coro = True  # must not be background coro: it is not an endless coro
220                except:
221                    ex_type, exception, tracback = sys.exc_info()
222                    raise
223                finally:
224                    for processed_event in processed_priority_events:
225                        del priority_events[processed_event]
226
227        self.make_dead()
228
229    def _on_register_handler(self, event: EventID, handler: Handler):
230        if event not in self.handlers:
231            self.handlers[event] = set()
232
233        self.handlers[event].add(handler)
234        self.make_live()
235        return True, None, None
236
237    def _on_remove_handler(self, event: EventID, handler: Handler):
238        if event in self.handlers:
239            self.handlers[event].discard(handler)
240            if not self.handlers[event]:
241                del self.handlers[event]
242
243        return True, None, None
244
245    def _on_send_event(self, event: EventID, data: Any, priority: Optional[CoroPriority]=CoroPriority.low):
246        events_of_priority = self.events[priority]
247        if event not in events_of_priority:
248            events_of_priority[event] = deque()
249
250        events_of_priority[event].append(data)
251        self.make_live()
252
253        return True, None, None
254
255    def _add_direct_request(self, event: EventID, data: Any, priority: Optional[CoroPriority]=CoroPriority.low) -> ValueExistence:
256        self._on_send_event(event, data, priority)
257        return (True, None)
258
259    def _on_get_event(self, event: EventID, default: Any = None) -> Tuple[bool, ValueExistence, None]:
260        for _, events in self.events.items():
261            if event in events:
262                return (True, events[event].popleft(), None)
263        
264        return (True, (False, default), None)
265
266    def _on_flush_event_data(self, event: EventID):
267        for _, events in self.events.items():
268            if event in events:
269                del events[event]
270                return (True, True, None)
271        
272        return (True, False, None)
273
274    def _on_wait(self, event: EventID):
275        requester_id = self.current_caller_coro_info.coro_id
276        requester = self.current_caller_coro_info.coro
277        requester.add_on_coro_del_handler(self._on_coro_del_handler)
278        if event not in self.waiters:
279            self.waiters[event] = set()
280
281        if requester_id not in self.events_by_waiter:
282            self.events_by_waiter[requester_id] = set()
283
284        self.waiters[event].add(requester_id)
285        self.events_by_waiter[requester_id].add(event)
286        return (False, None, None)
287
288    def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool:
289        coro_id = coro.coro_id
290        if coro_id in self.events_by_waiter:
291            events = self.events_by_waiter[coro_id]
292            for event in events:
293                if event in self.waiters:
294                    if coro_id in self.waiters[event]:
295                        self.waiters[event].remove(coro_id)
296
297            del self.events_by_waiter[coro_id]
298        
299        return False
300
301    def in_work(self):
302        result: bool = bool(self.waiters) | bool(self.handlers)
303        for priority in self.priority_sequence:
304            result |= bool(self.events[priority])
305        
306        return self.thrifty_in_work(result)
AsyncEventBus( loop: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
 83    def __init__(self, loop: CoroSchedulerType):
 84        super(AsyncEventBus, self).__init__(loop)
 85
 86        self._request_workers = {
 87            0: self._on_register_handler,
 88            1: self._on_remove_handler,
 89            2: self._on_send_event,
 90            3: self._on_get_event,
 91            4: self._on_wait,
 92            5: self._on_flush_event_data,
 93        }
 94
 95        self.waiters: Dict[EventID, Set[CoroID]] = dict()
 96        self.events_by_waiter: Dict[CoroID, Set[Any]] = dict()
 97        self.handlers: Dict[EventID, Set[Handler]] = dict()
 98        self.events: Dict[CoroPriority, Dict[EventID, Deque[Any]]] = {
 99            CoroPriority.low: dict(),
100            CoroPriority.normal: dict(),
101            CoroPriority.high: dict()
102        }
103        self.priority_sequence = [
104            CoroPriority.high,
105            CoroPriority.normal,
106            CoroPriority.low
107        ]
waiters: Dict[str, Set[int]]
events_by_waiter: Dict[int, Set[Any]]
handlers: Dict[str, Set[Callable[[str, Any], NoneType]]]
events: Dict[cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield.CoroPriority, Dict[str, Deque[Any]]]
priority_sequence
def get_entity_stats( self, stats_level: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin.StatsLevel = <StatsLevel.debug: 1>) -> Tuple[str, Dict[str, Any]]:
109    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
110        if EntityStatsMixin.StatsLevel.info == stats_level:
111            func_info = full_func_info_to_printable_dict
112        else:
113            func_info = full_func_info_to_dict
114
115        coroutines_set: Set = set()
116        for coroutines in self.waiters.values():
117            coroutines_set.update(coroutines)
118
119        handlers_set: Set = set()
120        for handlers in self.handlers.values():
121            handlers_set.update(handlers)
122            
123        return type(self).__name__, {
124            'events': {
125                'low priority': {
126                    'num': len(self.events[CoroPriority.low]),
127                    'list': set(self.events[CoroPriority.low])
128                },
129                'normal priority': {
130                    'num': len(self.events[CoroPriority.normal]),
131                    'list': set(self.events[CoroPriority.normal])
132                },
133                'high priority': {
134                    'num': len(self.events[CoroPriority.high]),
135                    'list': set(self.events[CoroPriority.high])
136                },
137            },
138            'waiting coroutines': {
139                'num': len(coroutines_set),
140                'list': coroutines_set,
141            },
142            'handlers': {
143                'num': len(handlers_set),
144                'list': [func_info(handler) for handler in handlers_set]
145            }
146        }
def single_task_registration_or_immediate_processing( self, request: Union[AsyncEventBusRequest, NoneType] = None) -> Tuple[bool, Any, NoneType]:
148    def single_task_registration_or_immediate_processing(self, request: Optional[AsyncEventBusRequest]=None
149                                                         ) -> Tuple[bool, Any, None]:
150        if request is not None:
151            return self.resolve_request(request)
152        return True, None, None
def full_processing_iteration(self):
154    def full_processing_iteration(self):
155        handlers = self.handlers
156        waiters = self.waiters
157        for priority in self.priority_sequence:
158            priority_events = self.events[priority]
159            if not priority_events:
160                continue
161            else:
162                interested_events = waiters.keys() & priority_events.keys()
163                for event in interested_events:
164                    event_data_list: Deque = priority_events[event]
165                    for i in range(len(event_data_list)):
166                        event_waiters = waiters.pop(event, None)
167                        if event_waiters is None:
168                            break
169                        else:
170                            for waiter in event_waiters:
171                                waiter_events = self.events_by_waiter.get(waiter, None)
172                                if waiter_events:
173                                    if event in waiter_events:
174                                        waiter_events.remove(event)
175                                        self.register_response(waiter, event_data_list.popleft())
176                
177                    if not event_data_list:
178                        del priority_events[event]
179
180                interested_events = handlers.keys() & priority_events.keys()
181            
182            if interested_events:
183                processed_priority_events = set()
184                handlers_info_list = list()
185                for event in interested_events:
186                    event_data_list = priority_events[event]
187                    processed_priority_events.add(event)
188                    for data in event_data_list:
189                        event_handlers = handlers.get(event, None)
190                        if event_handlers:
191                            for handler in event_handlers:
192                                handlers_info_list.append((handler, event, data))
193
194                async def event_processing_coro(
195                    interface: Interface,
196                    handlers_info_list: List[Tuple[(Handler, EventID, Any)]],
197                    priority: CoroPriority,
198                ):
199                    loop: CoroSchedulerType = interface._loop
200                    current_coro_interface_buff: Interface = loop.current_coro_interface
201                    ly = await agly(priority)
202                    for handler, event, data in handlers_info_list:
203                        try:
204                            loop.current_coro_interface = None
205                            handler(event, data)
206                        except:
207                            loop.logger.exception('AsyncEventBus. Event handler error')
208                        finally:
209                            loop.current_coro_interface = current_coro_interface_buff
210
211                        await ly()
212
213                try:
214                    coro: CoroWrapperBase = self._loop.put_coro(
215                        event_processing_coro,
216                        handlers_info_list,
217                        priority,
218                    )
219                    # coro.is_background_coro = True  # must not be background coro: it is not an endless coro
220                except:
221                    ex_type, exception, tracback = sys.exc_info()
222                    raise
223                finally:
224                    for processed_event in processed_priority_events:
225                        del priority_events[processed_event]
226
227        self.make_dead()
def in_work(self):
301    def in_work(self):
302        result: bool = bool(self.waiters) | bool(self.handlers)
303        for priority in self.priority_sequence:
304            result |= bool(self.events[priority])
305        
306        return self.thrifty_in_work(result)

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
in_forground_work
thrifty_in_work
time_left_before_next_event
is_low_latency
make_live
make_dead
service_id_impl
service_id
destroy
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
StatsLevel
class AsyncEventBusRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
56class AsyncEventBusRequest(ServiceRequest):
57    def register_handler(self, event: EventID, handler: Handler) -> None:
58        return self._save(0, event, handler)
59
60    def remove_handler(self, event: EventID, handler: Handler) -> None:
61        return self._save(1, event, handler)
62
63    def send_event(self, event: EventID, data: Any,
64                   priority: Optional[CoroPriority]=CoroPriority.low) -> None:
65        return self._save(2, event, data, priority)
66
67    def get_event(self, event: EventID, default: Any = None) -> ValueExistence[Any]:
68        return self._save(3, event, default)
69
70    def flush_event_data(self, event: EventID) -> ValueExistence[Any]:
71        return self._save(5, event)
72
73    def wait(self, event: EventID) -> Any:
74        """
75        Will block coroutine untill result is ready
76        :param event:
77        :return: event data
78        """
79        return self._save(4, event)
def register_handler(self, event: str, handler: Callable[[str, Any], NoneType]) -> None:
57    def register_handler(self, event: EventID, handler: Handler) -> None:
58        return self._save(0, event, handler)
def remove_handler(self, event: str, handler: Callable[[str, Any], NoneType]) -> None:
60    def remove_handler(self, event: EventID, handler: Handler) -> None:
61        return self._save(1, event, handler)
def send_event( self, event: str, data: Any, priority: Union[cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield.CoroPriority, NoneType] = <CoroPriority.low: 2>) -> None:
63    def send_event(self, event: EventID, data: Any,
64                   priority: Optional[CoroPriority]=CoroPriority.low) -> None:
65        return self._save(2, event, data, priority)
def get_event( self, event: str, default: Any = None) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Any]:
67    def get_event(self, event: EventID, default: Any = None) -> ValueExistence[Any]:
68        return self._save(3, event, default)
def flush_event_data( self, event: str) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Any]:
70    def flush_event_data(self, event: EventID) -> ValueExistence[Any]:
71        return self._save(5, event)
def wait(self, event: str) -> Any:
73    def wait(self, event: EventID) -> Any:
74        """
75        Will block coroutine untill result is ready
76        :param event:
77        :return: event data
78        """
79        return self._save(4, event)

Will block coroutine untill result is ready :param event: :return: event data

default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] = <class 'AsyncEventBus'>
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
default__request__type__
request_type
args
kwargs
provide_to_request_handler
interface
i
async_interface
ai
EventID = <class 'str'>
Handler = typing.Callable[[str, typing.Any], NoneType]
def try_send_async_event( backup_scheduler: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], event: str, data: Any, priority: Union[cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield.CoroPriority, NoneType] = <CoroPriority.low: 2>):
312def try_send_async_event(
313        backup_scheduler: Optional[CoroSchedulerType],
314        event: EventID, data: Any, priority: Optional[CoroPriority]=CoroPriority.low):
315    def event_sender(
316            interface: Interface,
317            event: EventID, data: Any, priority: Optional[CoroPriority]=CoroPriority.low):
318        interface(AsyncEventBus, AsyncEventBusRequest().send_event(event, data, priority))
319
320    try_put_coro_to(get_interface_and_loop_with_explicit_loop(backup_scheduler), event_sender, event, data, priority)