cengal.parallel_execution.coroutines.coro_standard_services.async_event_bus.versions.v_0.async_event_bus
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19__all__ = [ 20 'AsyncEventBus', 'AsyncEventBusRequest', 'EventID', 'Handler', 'try_send_async_event' 21] 22 23 24import sys 25from enum import Enum 26from typing import Dict, Set, List, Tuple, Union, Type, Optional, Any, Deque 27from cengal.parallel_execution.coroutines.coro_scheduler import * 28from cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request import * 29from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import * 30from cengal.parallel_execution.coroutines.coro_standard_services.event_bus import EventID, Handler 31from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import * 32from cengal.code_flow_control.smart_values import ValueExistence 33from cengal.introspection.inspect import get_exception 34from cengal.time_management.repeat_for_a_time import Tracer 35from collections import deque 36 37 38""" 39Module Docstring 40Docstrings: http://www.python.org/dev/peps/pep-0257/ 41""" 42 43__author__ = "ButenkoMS <gtalk@butenkoms.space>" 44__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 45__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 46__license__ = "Apache License, Version 2.0" 47__version__ = "4.4.1" 48__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 49__email__ = "gtalk@butenkoms.space" 50# __status__ = "Prototype" 51__status__ = "Development" 52# __status__ = "Production" 53 54 55class AsyncEventBusRequest(ServiceRequest): 56 def register_handler(self, event: EventID, handler: Handler) -> None: 57 return self._save(0, event, handler) 58 59 def remove_handler(self, event: EventID, handler: Handler) -> None: 60 return self._save(1, event, handler) 61 62 def send_event(self, event: EventID, data: Any, 63 priority: Optional[CoroPriority]=CoroPriority.low) -> None: 64 return self._save(2, event, data, priority) 65 66 def get_event(self, event: EventID, default: Any = None) -> ValueExistence[Any]: 67 return self._save(3, event, default) 68 69 def flush_event_data(self, event: EventID) -> ValueExistence[Any]: 70 return self._save(5, event) 71 72 def wait(self, event: EventID) -> Any: 73 """ 74 Will block coroutine untill result is ready 75 :param event: 76 :return: event data 77 """ 78 return self._save(4, event) 79 80 81class AsyncEventBus(ServiceWithADirectRequestMixin, Service, EntityStatsMixin): 82 def __init__(self, loop: CoroSchedulerType): 83 super(AsyncEventBus, self).__init__(loop) 84 85 self._request_workers = { 86 0: self._on_register_handler, 87 1: self._on_remove_handler, 88 2: self._on_send_event, 89 3: self._on_get_event, 90 4: self._on_wait, 91 5: self._on_flush_event_data, 92 } 93 94 self.waiters: Dict[EventID, Set[CoroID]] = dict() 95 self.events_by_waiter: Dict[CoroID, Set[Any]] = dict() 96 self.handlers: Dict[EventID, Set[Handler]] = dict() 97 self.events: Dict[CoroPriority, Dict[EventID, Deque[Any]]] = { 98 CoroPriority.low: dict(), 99 CoroPriority.normal: dict(), 100 CoroPriority.high: dict() 101 } 102 self.priority_sequence = [ 103 CoroPriority.high, 104 CoroPriority.normal, 105 CoroPriority.low 106 ] 107 108 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 109 if EntityStatsMixin.StatsLevel.info == stats_level: 110 func_info = full_func_info_to_printable_dict 111 else: 112 func_info = full_func_info_to_dict 113 114 coroutines_set: Set = set() 115 for coroutines in self.waiters.values(): 116 coroutines_set.update(coroutines) 117 118 handlers_set: Set = set() 119 for handlers in self.handlers.values(): 120 handlers_set.update(handlers) 121 122 return type(self).__name__, { 123 'events': { 124 'low priority': { 125 'num': len(self.events[CoroPriority.low]), 126 'list': set(self.events[CoroPriority.low]) 127 }, 128 'normal priority': { 129 'num': len(self.events[CoroPriority.normal]), 130 'list': set(self.events[CoroPriority.normal]) 131 }, 132 'high priority': { 133 'num': len(self.events[CoroPriority.high]), 134 'list': set(self.events[CoroPriority.high]) 135 }, 136 }, 137 'waiting coroutines': { 138 'num': len(coroutines_set), 139 'list': coroutines_set, 140 }, 141 'handlers': { 142 'num': len(handlers_set), 143 'list': [func_info(handler) for handler in handlers_set] 144 } 145 } 146 147 def single_task_registration_or_immediate_processing(self, request: Optional[AsyncEventBusRequest]=None 148 ) -> Tuple[bool, Any, None]: 149 if request is not None: 150 return self.resolve_request(request) 151 return True, None, None 152 153 def full_processing_iteration(self): 154 handlers = self.handlers 155 waiters = self.waiters 156 for priority in self.priority_sequence: 157 priority_events = self.events[priority] 158 if not priority_events: 159 continue 160 else: 161 interested_events = waiters.keys() & priority_events.keys() 162 for event in interested_events: 163 event_data_list: Deque = priority_events[event] 164 for i in range(len(event_data_list)): 165 event_waiters = waiters.pop(event, None) 166 if event_waiters is None: 167 break 168 else: 169 for waiter in event_waiters: 170 waiter_events = self.events_by_waiter.get(waiter, None) 171 if waiter_events: 172 if event in waiter_events: 173 waiter_events.remove(event) 174 self.register_response(waiter, event_data_list.popleft()) 175 176 if not event_data_list: 177 del priority_events[event] 178 179 interested_events = handlers.keys() & priority_events.keys() 180 181 if interested_events: 182 processed_priority_events = set() 183 handlers_info_list = list() 184 for event in interested_events: 185 event_data_list = priority_events[event] 186 processed_priority_events.add(event) 187 for data in event_data_list: 188 event_handlers = handlers.get(event, None) 189 if event_handlers: 190 for handler in event_handlers: 191 handlers_info_list.append((handler, event, data)) 192 193 async def event_processing_coro( 194 interface: Interface, 195 handlers_info_list: List[Tuple[(Handler, EventID, Any)]], 196 priority: CoroPriority, 197 ): 198 loop: CoroSchedulerType = interface._loop 199 current_coro_interface_buff: Interface = loop.current_coro_interface 200 ly = await agly(priority) 201 for handler, event, data in handlers_info_list: 202 try: 203 loop.current_coro_interface = None 204 handler(event, data) 205 except: 206 loop.logger.exception('AsyncEventBus. Event handler error') 207 finally: 208 loop.current_coro_interface = current_coro_interface_buff 209 210 await ly() 211 212 try: 213 coro: CoroWrapperBase = self._loop.put_coro( 214 event_processing_coro, 215 handlers_info_list, 216 priority, 217 ) 218 # coro.is_background_coro = True # must not be background coro: it is not an endless coro 219 except: 220 ex_type, exception, tracback = sys.exc_info() 221 raise 222 finally: 223 for processed_event in processed_priority_events: 224 del priority_events[processed_event] 225 226 self.make_dead() 227 228 def _on_register_handler(self, event: EventID, handler: Handler): 229 if event not in self.handlers: 230 self.handlers[event] = set() 231 232 self.handlers[event].add(handler) 233 self.make_live() 234 return True, None, None 235 236 def _on_remove_handler(self, event: EventID, handler: Handler): 237 if event in self.handlers: 238 self.handlers[event].discard(handler) 239 if not self.handlers[event]: 240 del self.handlers[event] 241 242 return True, None, None 243 244 def _on_send_event(self, event: EventID, data: Any, priority: Optional[CoroPriority]=CoroPriority.low): 245 events_of_priority = self.events[priority] 246 if event not in events_of_priority: 247 events_of_priority[event] = deque() 248 249 events_of_priority[event].append(data) 250 self.make_live() 251 252 return True, None, None 253 254 def _add_direct_request(self, event: EventID, data: Any, priority: Optional[CoroPriority]=CoroPriority.low) -> ValueExistence: 255 self._on_send_event(event, data, priority) 256 return (True, None) 257 258 def _on_get_event(self, event: EventID, default: Any = None) -> Tuple[bool, ValueExistence, None]: 259 for _, events in self.events.items(): 260 if event in events: 261 return (True, events[event].popleft(), None) 262 263 return (True, (False, default), None) 264 265 def _on_flush_event_data(self, event: EventID): 266 for _, events in self.events.items(): 267 if event in events: 268 del events[event] 269 return (True, True, None) 270 271 return (True, False, None) 272 273 def _on_wait(self, event: EventID): 274 requester_id = self.current_caller_coro_info.coro_id 275 requester = self.current_caller_coro_info.coro 276 requester.add_on_coro_del_handler(self._on_coro_del_handler) 277 if event not in self.waiters: 278 self.waiters[event] = set() 279 280 if requester_id not in self.events_by_waiter: 281 self.events_by_waiter[requester_id] = set() 282 283 self.waiters[event].add(requester_id) 284 self.events_by_waiter[requester_id].add(event) 285 return (False, None, None) 286 287 def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool: 288 coro_id = coro.coro_id 289 if coro_id in self.events_by_waiter: 290 events = self.events_by_waiter[coro_id] 291 for event in events: 292 if event in self.waiters: 293 if coro_id in self.waiters[event]: 294 self.waiters[event].remove(coro_id) 295 296 del self.events_by_waiter[coro_id] 297 298 return False 299 300 def in_work(self): 301 result: bool = bool(self.waiters) | bool(self.handlers) 302 for priority in self.priority_sequence: 303 result |= bool(self.events[priority]) 304 305 return self.thrifty_in_work(result) 306 307 308AsyncEventBusRequest.default_service_type = AsyncEventBus 309 310 311def try_send_async_event( 312 backup_scheduler: Optional[CoroSchedulerType], 313 event: EventID, data: Any, priority: Optional[CoroPriority]=CoroPriority.low): 314 def event_sender( 315 interface: Interface, 316 event: EventID, data: Any, priority: Optional[CoroPriority]=CoroPriority.low): 317 interface(AsyncEventBus, AsyncEventBusRequest().send_event(event, data, priority)) 318 319 try_put_coro_to(get_interface_and_loop_with_explicit_loop(backup_scheduler), event_sender, event, data, priority)
class
AsyncEventBus(cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request.versions.v_0.service_with_a_direct_request.ServiceWithADirectRequestMixin, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin):
82class AsyncEventBus(ServiceWithADirectRequestMixin, Service, EntityStatsMixin): 83 def __init__(self, loop: CoroSchedulerType): 84 super(AsyncEventBus, self).__init__(loop) 85 86 self._request_workers = { 87 0: self._on_register_handler, 88 1: self._on_remove_handler, 89 2: self._on_send_event, 90 3: self._on_get_event, 91 4: self._on_wait, 92 5: self._on_flush_event_data, 93 } 94 95 self.waiters: Dict[EventID, Set[CoroID]] = dict() 96 self.events_by_waiter: Dict[CoroID, Set[Any]] = dict() 97 self.handlers: Dict[EventID, Set[Handler]] = dict() 98 self.events: Dict[CoroPriority, Dict[EventID, Deque[Any]]] = { 99 CoroPriority.low: dict(), 100 CoroPriority.normal: dict(), 101 CoroPriority.high: dict() 102 } 103 self.priority_sequence = [ 104 CoroPriority.high, 105 CoroPriority.normal, 106 CoroPriority.low 107 ] 108 109 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 110 if EntityStatsMixin.StatsLevel.info == stats_level: 111 func_info = full_func_info_to_printable_dict 112 else: 113 func_info = full_func_info_to_dict 114 115 coroutines_set: Set = set() 116 for coroutines in self.waiters.values(): 117 coroutines_set.update(coroutines) 118 119 handlers_set: Set = set() 120 for handlers in self.handlers.values(): 121 handlers_set.update(handlers) 122 123 return type(self).__name__, { 124 'events': { 125 'low priority': { 126 'num': len(self.events[CoroPriority.low]), 127 'list': set(self.events[CoroPriority.low]) 128 }, 129 'normal priority': { 130 'num': len(self.events[CoroPriority.normal]), 131 'list': set(self.events[CoroPriority.normal]) 132 }, 133 'high priority': { 134 'num': len(self.events[CoroPriority.high]), 135 'list': set(self.events[CoroPriority.high]) 136 }, 137 }, 138 'waiting coroutines': { 139 'num': len(coroutines_set), 140 'list': coroutines_set, 141 }, 142 'handlers': { 143 'num': len(handlers_set), 144 'list': [func_info(handler) for handler in handlers_set] 145 } 146 } 147 148 def single_task_registration_or_immediate_processing(self, request: Optional[AsyncEventBusRequest]=None 149 ) -> Tuple[bool, Any, None]: 150 if request is not None: 151 return self.resolve_request(request) 152 return True, None, None 153 154 def full_processing_iteration(self): 155 handlers = self.handlers 156 waiters = self.waiters 157 for priority in self.priority_sequence: 158 priority_events = self.events[priority] 159 if not priority_events: 160 continue 161 else: 162 interested_events = waiters.keys() & priority_events.keys() 163 for event in interested_events: 164 event_data_list: Deque = priority_events[event] 165 for i in range(len(event_data_list)): 166 event_waiters = waiters.pop(event, None) 167 if event_waiters is None: 168 break 169 else: 170 for waiter in event_waiters: 171 waiter_events = self.events_by_waiter.get(waiter, None) 172 if waiter_events: 173 if event in waiter_events: 174 waiter_events.remove(event) 175 self.register_response(waiter, event_data_list.popleft()) 176 177 if not event_data_list: 178 del priority_events[event] 179 180 interested_events = handlers.keys() & priority_events.keys() 181 182 if interested_events: 183 processed_priority_events = set() 184 handlers_info_list = list() 185 for event in interested_events: 186 event_data_list = priority_events[event] 187 processed_priority_events.add(event) 188 for data in event_data_list: 189 event_handlers = handlers.get(event, None) 190 if event_handlers: 191 for handler in event_handlers: 192 handlers_info_list.append((handler, event, data)) 193 194 async def event_processing_coro( 195 interface: Interface, 196 handlers_info_list: List[Tuple[(Handler, EventID, Any)]], 197 priority: CoroPriority, 198 ): 199 loop: CoroSchedulerType = interface._loop 200 current_coro_interface_buff: Interface = loop.current_coro_interface 201 ly = await agly(priority) 202 for handler, event, data in handlers_info_list: 203 try: 204 loop.current_coro_interface = None 205 handler(event, data) 206 except: 207 loop.logger.exception('AsyncEventBus. Event handler error') 208 finally: 209 loop.current_coro_interface = current_coro_interface_buff 210 211 await ly() 212 213 try: 214 coro: CoroWrapperBase = self._loop.put_coro( 215 event_processing_coro, 216 handlers_info_list, 217 priority, 218 ) 219 # coro.is_background_coro = True # must not be background coro: it is not an endless coro 220 except: 221 ex_type, exception, tracback = sys.exc_info() 222 raise 223 finally: 224 for processed_event in processed_priority_events: 225 del priority_events[processed_event] 226 227 self.make_dead() 228 229 def _on_register_handler(self, event: EventID, handler: Handler): 230 if event not in self.handlers: 231 self.handlers[event] = set() 232 233 self.handlers[event].add(handler) 234 self.make_live() 235 return True, None, None 236 237 def _on_remove_handler(self, event: EventID, handler: Handler): 238 if event in self.handlers: 239 self.handlers[event].discard(handler) 240 if not self.handlers[event]: 241 del self.handlers[event] 242 243 return True, None, None 244 245 def _on_send_event(self, event: EventID, data: Any, priority: Optional[CoroPriority]=CoroPriority.low): 246 events_of_priority = self.events[priority] 247 if event not in events_of_priority: 248 events_of_priority[event] = deque() 249 250 events_of_priority[event].append(data) 251 self.make_live() 252 253 return True, None, None 254 255 def _add_direct_request(self, event: EventID, data: Any, priority: Optional[CoroPriority]=CoroPriority.low) -> ValueExistence: 256 self._on_send_event(event, data, priority) 257 return (True, None) 258 259 def _on_get_event(self, event: EventID, default: Any = None) -> Tuple[bool, ValueExistence, None]: 260 for _, events in self.events.items(): 261 if event in events: 262 return (True, events[event].popleft(), None) 263 264 return (True, (False, default), None) 265 266 def _on_flush_event_data(self, event: EventID): 267 for _, events in self.events.items(): 268 if event in events: 269 del events[event] 270 return (True, True, None) 271 272 return (True, False, None) 273 274 def _on_wait(self, event: EventID): 275 requester_id = self.current_caller_coro_info.coro_id 276 requester = self.current_caller_coro_info.coro 277 requester.add_on_coro_del_handler(self._on_coro_del_handler) 278 if event not in self.waiters: 279 self.waiters[event] = set() 280 281 if requester_id not in self.events_by_waiter: 282 self.events_by_waiter[requester_id] = set() 283 284 self.waiters[event].add(requester_id) 285 self.events_by_waiter[requester_id].add(event) 286 return (False, None, None) 287 288 def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool: 289 coro_id = coro.coro_id 290 if coro_id in self.events_by_waiter: 291 events = self.events_by_waiter[coro_id] 292 for event in events: 293 if event in self.waiters: 294 if coro_id in self.waiters[event]: 295 self.waiters[event].remove(coro_id) 296 297 del self.events_by_waiter[coro_id] 298 299 return False 300 301 def in_work(self): 302 result: bool = bool(self.waiters) | bool(self.handlers) 303 for priority in self.priority_sequence: 304 result |= bool(self.events[priority]) 305 306 return self.thrifty_in_work(result)
AsyncEventBus( loop: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
83 def __init__(self, loop: CoroSchedulerType): 84 super(AsyncEventBus, self).__init__(loop) 85 86 self._request_workers = { 87 0: self._on_register_handler, 88 1: self._on_remove_handler, 89 2: self._on_send_event, 90 3: self._on_get_event, 91 4: self._on_wait, 92 5: self._on_flush_event_data, 93 } 94 95 self.waiters: Dict[EventID, Set[CoroID]] = dict() 96 self.events_by_waiter: Dict[CoroID, Set[Any]] = dict() 97 self.handlers: Dict[EventID, Set[Handler]] = dict() 98 self.events: Dict[CoroPriority, Dict[EventID, Deque[Any]]] = { 99 CoroPriority.low: dict(), 100 CoroPriority.normal: dict(), 101 CoroPriority.high: dict() 102 } 103 self.priority_sequence = [ 104 CoroPriority.high, 105 CoroPriority.normal, 106 CoroPriority.low 107 ]
events: Dict[cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield.CoroPriority, Dict[str, Deque[Any]]]
def
get_entity_stats( self, stats_level: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin.StatsLevel = <StatsLevel.debug: 1>) -> Tuple[str, Dict[str, Any]]:
109 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 110 if EntityStatsMixin.StatsLevel.info == stats_level: 111 func_info = full_func_info_to_printable_dict 112 else: 113 func_info = full_func_info_to_dict 114 115 coroutines_set: Set = set() 116 for coroutines in self.waiters.values(): 117 coroutines_set.update(coroutines) 118 119 handlers_set: Set = set() 120 for handlers in self.handlers.values(): 121 handlers_set.update(handlers) 122 123 return type(self).__name__, { 124 'events': { 125 'low priority': { 126 'num': len(self.events[CoroPriority.low]), 127 'list': set(self.events[CoroPriority.low]) 128 }, 129 'normal priority': { 130 'num': len(self.events[CoroPriority.normal]), 131 'list': set(self.events[CoroPriority.normal]) 132 }, 133 'high priority': { 134 'num': len(self.events[CoroPriority.high]), 135 'list': set(self.events[CoroPriority.high]) 136 }, 137 }, 138 'waiting coroutines': { 139 'num': len(coroutines_set), 140 'list': coroutines_set, 141 }, 142 'handlers': { 143 'num': len(handlers_set), 144 'list': [func_info(handler) for handler in handlers_set] 145 } 146 }
def
single_task_registration_or_immediate_processing( self, request: Union[AsyncEventBusRequest, NoneType] = None) -> Tuple[bool, Any, NoneType]:
def
full_processing_iteration(self):
154 def full_processing_iteration(self): 155 handlers = self.handlers 156 waiters = self.waiters 157 for priority in self.priority_sequence: 158 priority_events = self.events[priority] 159 if not priority_events: 160 continue 161 else: 162 interested_events = waiters.keys() & priority_events.keys() 163 for event in interested_events: 164 event_data_list: Deque = priority_events[event] 165 for i in range(len(event_data_list)): 166 event_waiters = waiters.pop(event, None) 167 if event_waiters is None: 168 break 169 else: 170 for waiter in event_waiters: 171 waiter_events = self.events_by_waiter.get(waiter, None) 172 if waiter_events: 173 if event in waiter_events: 174 waiter_events.remove(event) 175 self.register_response(waiter, event_data_list.popleft()) 176 177 if not event_data_list: 178 del priority_events[event] 179 180 interested_events = handlers.keys() & priority_events.keys() 181 182 if interested_events: 183 processed_priority_events = set() 184 handlers_info_list = list() 185 for event in interested_events: 186 event_data_list = priority_events[event] 187 processed_priority_events.add(event) 188 for data in event_data_list: 189 event_handlers = handlers.get(event, None) 190 if event_handlers: 191 for handler in event_handlers: 192 handlers_info_list.append((handler, event, data)) 193 194 async def event_processing_coro( 195 interface: Interface, 196 handlers_info_list: List[Tuple[(Handler, EventID, Any)]], 197 priority: CoroPriority, 198 ): 199 loop: CoroSchedulerType = interface._loop 200 current_coro_interface_buff: Interface = loop.current_coro_interface 201 ly = await agly(priority) 202 for handler, event, data in handlers_info_list: 203 try: 204 loop.current_coro_interface = None 205 handler(event, data) 206 except: 207 loop.logger.exception('AsyncEventBus. Event handler error') 208 finally: 209 loop.current_coro_interface = current_coro_interface_buff 210 211 await ly() 212 213 try: 214 coro: CoroWrapperBase = self._loop.put_coro( 215 event_processing_coro, 216 handlers_info_list, 217 priority, 218 ) 219 # coro.is_background_coro = True # must not be background coro: it is not an endless coro 220 except: 221 ex_type, exception, tracback = sys.exc_info() 222 raise 223 finally: 224 for processed_event in processed_priority_events: 225 del priority_events[processed_event] 226 227 self.make_dead()
def
in_work(self):
301 def in_work(self): 302 result: bool = bool(self.waiters) | bool(self.handlers) 303 for priority in self.priority_sequence: 304 result |= bool(self.events[priority]) 305 306 return self.thrifty_in_work(result)
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- time_left_before_next_event
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
- destroy
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
- StatsLevel
class
AsyncEventBusRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
56class AsyncEventBusRequest(ServiceRequest): 57 def register_handler(self, event: EventID, handler: Handler) -> None: 58 return self._save(0, event, handler) 59 60 def remove_handler(self, event: EventID, handler: Handler) -> None: 61 return self._save(1, event, handler) 62 63 def send_event(self, event: EventID, data: Any, 64 priority: Optional[CoroPriority]=CoroPriority.low) -> None: 65 return self._save(2, event, data, priority) 66 67 def get_event(self, event: EventID, default: Any = None) -> ValueExistence[Any]: 68 return self._save(3, event, default) 69 70 def flush_event_data(self, event: EventID) -> ValueExistence[Any]: 71 return self._save(5, event) 72 73 def wait(self, event: EventID) -> Any: 74 """ 75 Will block coroutine untill result is ready 76 :param event: 77 :return: event data 78 """ 79 return self._save(4, event)
def
send_event( self, event: str, data: Any, priority: Union[cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield.CoroPriority, NoneType] = <CoroPriority.low: 2>) -> None:
def
get_event( self, event: str, default: Any = None) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Any]:
def
flush_event_data( self, event: str) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Any]:
def
wait(self, event: str) -> Any:
73 def wait(self, event: EventID) -> Any: 74 """ 75 Will block coroutine untill result is ready 76 :param event: 77 :return: event data 78 """ 79 return self._save(4, event)
Will block coroutine untill result is ready :param event: :return: event data
default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] =
<class 'AsyncEventBus'>
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
- default__request__type__
- request_type
- args
- kwargs
- provide_to_request_handler
- interface
- i
- async_interface
- ai
EventID =
<class 'str'>
Handler =
typing.Callable[[str, typing.Any], NoneType]
def
try_send_async_event( backup_scheduler: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], event: str, data: Any, priority: Union[cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield.CoroPriority, NoneType] = <CoroPriority.low: 2>):
312def try_send_async_event( 313 backup_scheduler: Optional[CoroSchedulerType], 314 event: EventID, data: Any, priority: Optional[CoroPriority]=CoroPriority.low): 315 def event_sender( 316 interface: Interface, 317 event: EventID, data: Any, priority: Optional[CoroPriority]=CoroPriority.low): 318 interface(AsyncEventBus, AsyncEventBusRequest().send_event(event, data, priority)) 319 320 try_put_coro_to(get_interface_and_loop_with_explicit_loop(backup_scheduler), event_sender, event, data, priority)