cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19""" 20Module Docstring 21Docstrings: http://www.python.org/dev/peps/pep-0257/ 22""" 23 24 25__author__ = "ButenkoMS <gtalk@butenkoms.space>" 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 28__license__ = "Apache License, Version 2.0" 29__version__ = "4.4.1" 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 31__email__ = "gtalk@butenkoms.space" 32# __status__ = "Prototype" 33__status__ = "Development" 34# __status__ = "Production" 35 36 37__all__ = [ 38 'Counter', 'Iterable', 'ServiceResponseTypeVar', 'ServiceType', 'TypedServiceType', 'NormalizableServiceType', 39 'ItemID', 'CoroID', 'Worker', 'GreenetCoro', 'ACoro', 'Coro', 'OnCoroDelHandler', 'cs_coro', 'cs_acoro', 'cs_callable', 'cs_acallable', 40 'OutsideCoroSchedulerContext', 'current_coro_scheduler', 'get_current_coro_scheduler', 'set_primary_coro_scheduler', 41 'PrimaryCoroSchedulerWasNotSet', 'primary_coro_scheduler', 'get_primary_coro_scheduler', 'CoroSchedulerContextIsNotAvailable', 42 'available_coro_scheduler', 'get_available_coro_scheduler', 'WrongTypeOfShedulerError', 'InterfaceIsNotAvailableError', 43 'CurrentCoroIsNotAliveError', 'loop_with_backup_loop', 'get_loop_with_backup_loop', 'loop_with_explicit_loop', 44 'get_loop_with_explicit_loop', 'interface_and_loop_with_backup_loop', 'get_interface_and_loop_with_backup_loop', 45 'interface_and_loop_with_explicit_loop', 'get_interface_and_loop_with_explicit_loop', 'interface_for_an_explicit_loop', 46 'get_interface_for_an_explicit_loop', 'service_with_backup_loop', 'get_service_with_backup_loop', 'service_with_explicit_loop', 47 'get_service_with_explicit_loop', 'service_fast_with_backup_loop', 'get_service_fast_with_backup_loop', 48 'service_fast_with_explicit_loop', 'get_service_fast_with_explicit_loop', 'CoroType', 'ExplicitWorker', 'AnyWorker', 49 'CoroScheduler', 'CoroSchedulerType' , 'CoroSchedulerGreenlet', 'CoroSchedulerAwaitable', 'current_interface', 'execute_coro', 'exec_coro', 'ecoro', 'aexecute_coro', 'aexec_coro', 'aecoro', 50 'around_await', 'Request', 'Response', 'DirectResponse', 'Interface', 'InterfaceGreenlet', 'find_coro_type', 51 'InterfaceAsyncAwait', 'InterfaceFake', 'InterfaceFakeAsyncAwait', 'CallerCoroInfo', 'ServiceRequest', 'TypedServiceRequest', 52 'ServiceRequestMethodMixin', 'DualImmediateProcessingServiceMixin', 'WrongServiceRequestError', 'Service', 'TypedService', 53 'CoroWrapperBase', 'CoroWrapperGreenlet', 'CoroWrapperAsyncAwait', 'dlog', 'log_exception_traceback_info', 54 'log_uncatched_exception', 'func_info', 'ServiceProcessingResultExists', 'ServiceProcessingResult', 'ServiceProcessingException', 55 'ServiceProcessingResponse', 'full_func_info_to_dict', 'full_func_info_to_printable_dict', 'GreenletWorkerWrapper', 56 'EntityStatsMixin', 'CoroSchedulerIsCurrentlyDestroingError', 'CoroSchedulerDestroyException', 57 'CoroSchedulerDestroyRequestedException', 'greenlet_awailable', 'GreenletExit' 58] 59 60import sys 61import os 62import inspect 63from contextlib import contextmanager 64from typing import Coroutine, Dict, Tuple, List, Callable, Awaitable, Any, Optional, Type, Set, Union, Generator, AsyncGenerator, overload, Generic, TypeVar 65# try: 66# from typing import TypeAlias, ParamSpec, Concatenate 67# except ImportError: 68# from typing_extensions import TypeAlias, ParamSpec, Concatenate 69from typing_extensions import TypeAlias, ParamSpec, Concatenate 70from enum import Enum 71import types 72from cengal.time_management.cpu_clock_cycles import perf_counter 73import traceback 74from contextlib import contextmanager 75import logging 76from datetime import datetime 77from cengal.introspection.inspect import get_exception, exception_to_printable_text, is_async 78from cengal.code_flow_control.args_manager import args_kwargs 79from threading import local 80from cengal.time_management.sleep_tools import sleep 81from collections import deque 82from cengal.code_flow_control.args_manager import EntityArgsHolder, EntityArgsHolderExplicit 83from cengal.time_management.cpu_clock_cycles import cpu_clock_cycles 84from cengal.system import PYTHON_VERSION_INT 85from functools import wraps, update_wrapper 86 87 88greenlet_awailable: bool = True 89try: 90 from greenlet import greenlet, GreenletExit 91except ImportError: 92 greenlet_awailable = False 93 class GreenletExit(Exception): 94 pass 95 96# greenlet_awailable = False 97 98class Counter: 99 def __init__(self): 100 self._index = -1 # type: int 101 102 def get(self) -> int: 103 self._index += 1 104 return self._index 105 106 107class Iterable: 108 def iteration(self) -> bool: 109 """ 110 should return False if ready to stop looping 111 :return: 112 """ 113 raise NotImplementedError 114 115 116ServiceResponseTypeVar = TypeVar('ServiceResponseTypeVar') 117 118 119ServiceType = Type['Service'] 120TypedServiceType = Type['TypedService[ServiceResponseTypeVar]'] 121NormalizableServiceType = Union[Type['Service'], Type['TypedService[ServiceResponseTypeVar]'], Type['ServiceRequest'], Type['TypedServiceRequest[ServiceResponseTypeVar]'], 'Service', 'TypedService[ServiceResponseTypeVar]', 'ServiceRequest', 'TypedServiceRequest[ServiceResponseTypeVar]'] 122ItemID = int 123CoroID = ItemID 124# Worker = Callable[['Interface'], Any] 125if sys.version_info >= (3, 10): 126 GreenletWoker = Callable[['Interface', ...], Any] 127 AsyncWoker = Callable[['Interface', ...], Awaitable[Any]] 128 Worker = Union[GreenletWoker, AsyncWoker] 129else: 130 GreenletWoker = Callable[['Interface'], Any] 131 AsyncWoker = Callable[['Interface'], Awaitable[Any]] 132 Worker = Union[GreenletWoker, AsyncWoker] 133 134GreenetCoro = greenlet 135ACoro = Union[Awaitable, Coroutine, Generator, AsyncGenerator, Callable] 136Coro = Union[GreenetCoro, ACoro] 137OnCoroDelHandler = Callable[['CoroWrapperBase'], bool] 138 139 140CoroParams = ParamSpec("CoroParams") 141CoroResult = TypeVar("CoroResult") 142 143 144def cs_coro(coro_worker: Callable[CoroParams, CoroResult]) -> Callable[Concatenate['Interface', CoroParams], CoroResult]: 145 """Decorator. Without arguments. Makes a greenlet Cengal coroutine from a pain function (which don't have an interface parameter) 146 147 Args: 148 coro_worker (Callable): _description_ 149 """ 150 if is_async(coro_worker): 151 async def wrapper(i: Interface, *args, **kwargs): 152 return await coro_worker(*args, **kwargs) 153 154 coro_worker_sign: inspect.Signature = inspect.signature(coro_worker) 155 update_wrapper(wrapper, coro_worker) 156 wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation) 157 # wrapper.__name__ = coro_worker.__name__ 158 wrapper: coro_worker 159 return wrapper 160 else: 161 def wrapper(i: Interface, *args, **kwargs): 162 return coro_worker(*args, **kwargs) 163 164 coro_worker_sign: inspect.Signature = inspect.signature(coro_worker) 165 update_wrapper(wrapper, coro_worker) 166 wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation) 167 # wrapper.__name__ = coro_worker.__name__ 168 wrapper: coro_worker 169 return wrapper 170 171 172def cs_acoro(coro_aworker: Callable[CoroParams, Awaitable[CoroResult]]) -> Callable[Concatenate['Interface', CoroParams], Awaitable[CoroResult]]: 173 """Decorator. Without arguments. Makes an async Cengal coroutine from an async function (which don't have an interface parameter) 174 175 Args: 176 coro_aworker (Callable): _description_ 177 """ 178 if is_async(coro_aworker): 179 async def wrapper(i: Interface, *args, **kwargs): 180 return await coro_aworker(*args, **kwargs) 181 182 coro_worker_sign: inspect.Signature = inspect.signature(coro_aworker) 183 update_wrapper(wrapper, coro_aworker) 184 wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation) 185 # wrapper.__name__ = coro_aworker.__name__ 186 wrapper: coro_aworker 187 return wrapper 188 else: 189 def wrapper(i: Interface, *args, **kwargs): 190 return coro_aworker(*args, **kwargs) 191 192 coro_worker_sign: inspect.Signature = inspect.signature(coro_aworker) 193 update_wrapper(wrapper, coro_aworker) 194 wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation) 195 wrapper.__name__ = coro_aworker.__name__ 196 wrapper: coro_aworker 197 return wrapper 198 199 200def cs_callable(coro_worker: Callable[Concatenate['Interface', CoroParams], CoroResult]) -> Callable[CoroParams, CoroResult]: 201 """Decorator. Without arguments. Makes a callable sync Cengal coroutine (which don't have an interface parameter) from a sync Cengal coroutine 202 203 Args: 204 coro_worker (Callable): _description_ 205 """ 206 if is_async(coro_worker): 207 async def wrapper(*args, **kwargs): 208 i: Interface = current_interface() 209 return await coro_worker(i, *args, **kwargs) 210 211 coro_worker_sign: inspect.Signature = inspect.signature(coro_worker) 212 update_wrapper(wrapper, coro_worker) 213 # wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation) 214 wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation) 215 # wrapper.__name__ = coro_worker.__name__ 216 wrapper: coro_worker 217 return wrapper 218 else: 219 def wrapper(*args, **kwargs): 220 i: Interface = current_interface() 221 return coro_worker(i, *args, **kwargs) 222 223 coro_worker_sign: inspect.Signature = inspect.signature(coro_worker) 224 update_wrapper(wrapper, coro_worker) 225 # wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation) 226 wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation) 227 # wrapper.__name__ = coro_worker.__name__ 228 wrapper: coro_worker 229 return wrapper 230 231 232def cs_acallable(coro_aworker: Callable[Concatenate['Interface', CoroParams], Awaitable[CoroResult]]) -> Callable[CoroParams, Awaitable[CoroResult]]: 233 """Decorator. Without arguments. Makes a callable async Cengal coroutine (which don't have an interface parameter) from a async Cengal coroutine 234 235 Args: 236 coro_aworker (Callable): _description_ 237 """ 238 if is_async(coro_aworker): 239 async def wrapper(*args, **kwargs): 240 i: Interface = current_interface() 241 return await coro_aworker(i, *args, **kwargs) 242 243 coro_worker_sign: inspect.Signature = inspect.signature(coro_aworker) 244 update_wrapper(wrapper, coro_aworker) 245 # wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation) 246 wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation) 247 # wrapper.__name__ = coro_aworker.__name__ 248 wrapper: coro_aworker 249 return wrapper 250 else: 251 def wrapper(*args, **kwargs): 252 i: Interface = current_interface() 253 return coro_aworker(i, *args, **kwargs) 254 255 coro_worker_sign: inspect.Signature = inspect.signature(coro_aworker) 256 update_wrapper(wrapper, coro_aworker) 257 # wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation) 258 wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation) 259 # wrapper.__name__ = coro_aworker.__name__ 260 wrapper: coro_aworker 261 return wrapper 262 263 264class _ThreadLocalCoroScheduler(local): 265 scheduler: Optional['CoroSchedulerType'] = None 266 267 268_primary_coro_scheduler: _ThreadLocalCoroScheduler = _ThreadLocalCoroScheduler() 269_current_coro_scheduler: _ThreadLocalCoroScheduler = _ThreadLocalCoroScheduler() 270 271 272_debug_log_counter = 0 273 274 275def _debug_log(*args, **kwargs): 276 print(*args, **kwargs) 277 global _debug_log_counter 278 _debug_log_counter += 1 279 280 281def _fake_debug_log(*args, **kwargs): 282 pass 283 284 285dlog = _fake_debug_log 286if False and __debug__: 287 dlog = _debug_log 288 289 290def log_exception_traceback_info(): 291 exception = sys.exc_info() 292 formattedTraceback = traceback.format_exception(exception[0], exception[1], exception[2]) 293 exception = exception[:2] + (formattedTraceback,) 294 trace = '' 295 for line in exception[2]: 296 trace += line 297 if __debug__: dlog(trace, file=sys.stderr) 298 if __debug__: dlog(exception[0]) 299 if __debug__: dlog(exception[1]) 300 301 302@contextmanager 303def log_uncatched_exception(): 304 try: 305 yield 306 except: 307 log_exception_traceback_info() 308 raise 309 310 311def func_info(func, full_name: Optional[bool]=True): 312 if full_name: 313 # return f'{func.__class__}({func.__module__}.{func.__qualname__}) @ {func.__code__.co_filename}:{func.__code__.co_firstlineno}' 314 return f'{func.__class__}({func.__qualname__}) @ {func.__code__.co_filename}:{func.__code__.co_firstlineno}' 315 else: 316 return f'{func.__class__}({func.__qualname__}) @ {os.path.basename(func.__code__.co_filename)}:{func.__code__.co_firstlineno}' 317 318 319def full_func_info(my): 320 if __debug__: dlog(repr(my), 321 my.__module__, 322 my.__name__, 323 my.__qualname__, 324 my.__annotations__, 325 my.__class__, 326 my.__closure__, 327 my.__code__, 328 my.__code__.co_argcount, 329 my.__code__.co_cellvars, 330 my.__code__.co_code, 331 my.__code__.co_consts, 332 my.__code__.co_filename, 333 my.__code__.co_firstlineno, 334 my.__code__.co_flags, 335 my.__code__.co_freevars, 336 my.__code__.co_kwonlyargcount, 337 my.__code__.co_lnotab, 338 my.__code__.co_name, 339 my.__code__.co_names, 340 my.__code__.co_nlocals, 341 my.__code__.co_stacksize, 342 my.__code__.co_varnames, 343 ) 344 345 346def full_func_info_to_dict(my): 347 return { 348 'repr': repr(my), 349 'module': str(my.__module__), 350 'name': str(my.__name__), 351 'qualname': str(my.__qualname__), 352 'annotations': str(my.__annotations__), 353 'class': str(my.__class__), 354 'closure': str(my.__closure__), 355 'code': str(my.__code__), 356 'co_argcount': str(my.__code__.co_argcount), 357 'co_cellvars': str(my.__code__.co_cellvars), 358 'co_code': str(my.__code__.co_code), 359 'co_consts': str(my.__code__.co_consts), 360 'co_filename': str(my.__code__.co_filename), 361 'co_firstlineno': str(my.__code__.co_firstlineno), 362 'co_flags': str(my.__code__.co_flags), 363 'co_freevars': str(my.__code__.co_freevars), 364 'co_kwonlyargcount': str(my.__code__.co_kwonlyargcount), 365 'co_lnotab': str(my.__code__.co_lnotab), 366 'co_name': str(my.__code__.co_name), 367 'co_names': str(my.__code__.co_names), 368 'co_nlocals': str(my.__code__.co_nlocals), 369 'co_stacksize': str(my.__code__.co_stacksize), 370 'co_varnames': str(my.__code__.co_varnames), 371 } 372 373 374def full_func_info_to_printable_dict(func: Callable) -> Dict[str, str]: 375 func_info: Dict[str, str] = full_func_info_to_dict(func) 376 good_keys = {'module', 'qualname', 'class', 'co_filename', 'co_firstlineno'} 377 result: Dict[str, str] = dict() 378 for key, value in func_info.items(): 379 if key in good_keys: 380 result[key] = value 381 382 return result 383 384 385class OutsideCoroSchedulerContext(Exception): 386 pass 387 388 389def current_coro_scheduler() -> 'CoroSchedulerType': 390 if _current_coro_scheduler.scheduler is None: 391 raise OutsideCoroSchedulerContext 392 393 return _current_coro_scheduler.scheduler 394 395 396def get_current_coro_scheduler() -> Optional['CoroSchedulerType']: 397 return _current_coro_scheduler.scheduler 398 399 400def set_primary_coro_scheduler(coro_scheduler: 'CoroSchedulerType'): 401 _primary_coro_scheduler.scheduler = coro_scheduler 402 403 404class PrimaryCoroSchedulerWasNotSet(Exception): 405 pass 406 407 408def primary_coro_scheduler() -> 'CoroSchedulerType': 409 if _primary_coro_scheduler.scheduler is None: 410 raise PrimaryCoroSchedulerWasNotSet 411 412 return _primary_coro_scheduler.scheduler 413 414 415def get_primary_coro_scheduler() -> Optional['CoroSchedulerType']: 416 return _primary_coro_scheduler.scheduler 417 418 419class CoroSchedulerContextIsNotAvailable(Exception): 420 pass 421 422 423def available_coro_scheduler() -> 'CoroSchedulerType': 424 if _current_coro_scheduler.scheduler: 425 return _current_coro_scheduler.scheduler 426 elif _primary_coro_scheduler.scheduler: 427 return _primary_coro_scheduler.scheduler 428 else: 429 raise CoroSchedulerContextIsNotAvailable 430 431 432def get_available_coro_scheduler() -> Optional['CoroSchedulerType']: 433 if _current_coro_scheduler.scheduler: 434 return _current_coro_scheduler.scheduler 435 elif _primary_coro_scheduler.scheduler: 436 return _primary_coro_scheduler.scheduler 437 else: 438 return None 439 440 441class WrongTypeOfShedulerError(Exception): 442 pass 443 444 445class InterfaceIsNotAvailableError(Exception): 446 pass 447 448 449class CurrentCoroIsNotAliveError(Exception): 450 pass 451 452 453# ========================================================== 454 455 456def loop_with_backup_loop(backup_scheduler: Optional['CoroSchedulerType'] = None) -> 'CoroSchedulerType': 457 if backup_scheduler is not None: 458 if not isinstance(backup_scheduler, CoroScheduler): 459 raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}') 460 461 loop = CoroScheduler.current_loop() 462 if loop is None: 463 # Outside the loop 464 loop = get_available_coro_scheduler() 465 if loop is None: 466 loop = backup_scheduler 467 468 if loop is None: 469 raise CoroSchedulerContextIsNotAvailable 470 471 return loop 472 473 474def get_loop_with_backup_loop(backup_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['CoroSchedulerType']: 475 if backup_scheduler is not None: 476 if not isinstance(backup_scheduler, CoroScheduler): 477 raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}') 478 479 loop = CoroScheduler.current_loop() 480 if loop is None: 481 # Outside the loop 482 loop = get_available_coro_scheduler() 483 if loop is None: 484 loop = backup_scheduler 485 486 return loop 487 488 489def loop_with_explicit_loop(explicit_scheduler: Optional['CoroSchedulerType'] = None) -> 'CoroSchedulerType': 490 loop = explicit_scheduler 491 current_loop = CoroScheduler.current_loop() 492 if loop is None: 493 loop = current_loop 494 if loop is None: 495 # Outside the loop 496 loop = get_available_coro_scheduler() 497 else: 498 if not isinstance(loop, CoroScheduler): 499 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 500 501 if loop is None: 502 raise CoroSchedulerContextIsNotAvailable 503 504 return loop 505 506 507def get_loop_with_explicit_loop(explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['CoroSchedulerType']: 508 loop = explicit_scheduler 509 current_loop = CoroScheduler.current_loop() 510 if loop is None: 511 loop = current_loop 512 if loop is None: 513 # Outside the loop 514 loop = get_available_coro_scheduler() 515 else: 516 if not isinstance(loop, CoroScheduler): 517 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 518 519 return loop 520# ========================================================== 521 522 523def interface_and_loop_with_backup_loop(backup_scheduler: Optional['CoroSchedulerType'] = None) -> Tuple['CoroSchedulerType', 'Interface', bool]: 524 if backup_scheduler is not None: 525 if not isinstance(backup_scheduler, CoroScheduler): 526 raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}') 527 528 loop = CoroScheduler.current_loop() 529 if loop is None: 530 # Outside the loop 531 interface = None 532 loop = get_available_coro_scheduler() 533 if loop is None: 534 loop = backup_scheduler 535 else: 536 # In the loop (in coroutine or in the service) 537 interface = loop.current_interface() 538 539 if loop is None: 540 raise CoroSchedulerContextIsNotAvailable 541 542 if interface is None: 543 raise InterfaceIsNotAvailableError 544 545 coro_alive: bool = False 546 if interface is not None: 547 if interface._coro: 548 coro_alive = True 549 550 if not coro_alive: 551 raise CurrentCoroIsNotAliveError 552 553 return loop, interface, coro_alive 554 555 556def get_interface_and_loop_with_backup_loop(backup_scheduler: Optional['CoroSchedulerType'] = None) -> Tuple[Optional['CoroSchedulerType'], Optional['Interface'], bool]: 557 if backup_scheduler is not None: 558 if not isinstance(backup_scheduler, CoroScheduler): 559 raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}') 560 561 loop = CoroScheduler.current_loop() 562 if loop is None: 563 # Outside the loop 564 interface = None 565 loop = get_available_coro_scheduler() 566 if loop is None: 567 loop = backup_scheduler 568 else: 569 # In the loop (in coroutine or in the service) 570 interface = loop.current_interface() 571 572 coro_alive: bool = False 573 if interface is not None: 574 if interface._coro: 575 coro_alive = True 576 577 return loop, interface, coro_alive 578 579 580def interface_and_loop_with_explicit_loop(explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Tuple['CoroSchedulerType', 'Interface', bool]: 581 loop = explicit_scheduler 582 current_loop = CoroScheduler.current_loop() 583 interface = None 584 if loop is None: 585 loop = current_loop 586 if loop is None: 587 # Outside the loop 588 loop = get_available_coro_scheduler() 589 else: 590 # In the loop (in coroutine or in the service) 591 interface = loop.current_interface() 592 else: 593 if isinstance(loop, CoroScheduler): 594 if loop is current_loop: 595 interface = loop.current_interface() 596 else: 597 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 598 599 if loop is None: 600 raise CoroSchedulerContextIsNotAvailable 601 602 if interface is None: 603 raise InterfaceIsNotAvailableError 604 605 coro_alive: bool = False 606 if interface is not None: 607 if interface._coro: 608 coro_alive = True 609 610 if not coro_alive: 611 raise CurrentCoroIsNotAliveError 612 613 return loop, interface, coro_alive 614 615 616def get_interface_and_loop_with_explicit_loop(explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Tuple[Optional['CoroSchedulerType'], Optional['Interface'], bool]: 617 loop = explicit_scheduler 618 current_loop = CoroScheduler.current_loop() 619 interface = None 620 if loop is None: 621 loop = current_loop 622 if loop is None: 623 # Outside the loop 624 loop = get_available_coro_scheduler() 625 else: 626 # In the loop (in coroutine or in the service) 627 interface = loop.current_interface() 628 else: 629 if isinstance(loop, CoroScheduler): 630 if loop is current_loop: 631 interface = loop.current_interface() 632 else: 633 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 634 635 coro_alive: bool = False 636 if interface is not None: 637 if interface._coro: 638 coro_alive = True 639 640 return loop, interface, coro_alive 641 642 643def interface_for_an_explicit_loop(explicit_scheduler: 'CoroSchedulerType') -> Tuple['CoroSchedulerType', 'Interface', bool]: 644 loop = explicit_scheduler 645 interface = None 646 if loop is None: 647 raise CoroSchedulerContextIsNotAvailable 648 else: 649 if isinstance(loop, CoroScheduler): 650 if loop is CoroScheduler.current_loop(): 651 interface = loop.current_interface() 652 else: 653 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 654 655 if interface is None: 656 raise InterfaceIsNotAvailableError 657 658 coro_alive: bool = False 659 if interface is not None: 660 if interface._coro: 661 coro_alive = True 662 663 if not coro_alive: 664 raise CurrentCoroIsNotAliveError 665 666 return loop, interface, coro_alive 667 668 669def get_interface_for_an_explicit_loop(explicit_scheduler: 'CoroSchedulerType') -> Tuple[Optional['CoroSchedulerType'], Optional['Interface'], bool]: 670 loop = explicit_scheduler 671 interface = None 672 if loop is not None: 673 if isinstance(loop, CoroScheduler): 674 if loop is CoroScheduler.current_loop(): 675 interface = loop.current_interface() 676 else: 677 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 678 679 coro_alive: bool = False 680 if interface is not None: 681 if interface._coro: 682 coro_alive = True 683 684 return loop, interface, coro_alive 685 686 687# ========================================================== 688 689def service_with_backup_loop(service_type: Type['Service'], backup_scheduler: Optional['CoroSchedulerType'] = None) -> 'Service': 690 if backup_scheduler is not None: 691 if not isinstance(backup_scheduler, CoroScheduler): 692 raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}') 693 694 loop = CoroScheduler.current_loop() 695 if loop is None: 696 # Outside the loop 697 loop = get_available_coro_scheduler() 698 if loop is None: 699 loop = backup_scheduler 700 701 if loop is None: 702 raise CoroSchedulerContextIsNotAvailable 703 704 return loop.get_service_instance(service_type) 705 706 707def get_service_with_backup_loop(service_type: Type['Service'], backup_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['Service']: 708 if backup_scheduler is not None: 709 if not isinstance(backup_scheduler, CoroScheduler): 710 raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}') 711 712 loop = CoroScheduler.current_loop() 713 if loop is None: 714 # Outside the loop 715 loop = get_available_coro_scheduler() 716 if loop is None: 717 loop = backup_scheduler 718 719 if loop is None: 720 return None 721 else: 722 return loop.get_service_instance(service_type) 723 724 725def service_with_explicit_loop(service_type: Type['Service'], explicit_scheduler: Optional['CoroSchedulerType'] = None) -> 'Service': 726 loop = explicit_scheduler 727 current_loop = CoroScheduler.current_loop() 728 if loop is None: 729 loop = current_loop 730 if loop is None: 731 # Outside the loop 732 loop = get_available_coro_scheduler() 733 else: 734 if not isinstance(loop, CoroScheduler): 735 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 736 737 if loop is None: 738 raise CoroSchedulerContextIsNotAvailable 739 740 return loop.get_service_instance(service_type) 741 742 743def get_service_with_explicit_loop(service_type: Type['Service'], explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['Service']: 744 loop = explicit_scheduler 745 current_loop = CoroScheduler.current_loop() 746 if loop is None: 747 loop = current_loop 748 if loop is None: 749 # Outside the loop 750 loop = get_available_coro_scheduler() 751 else: 752 if not isinstance(loop, CoroScheduler): 753 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 754 755 if loop is None: 756 return None 757 else: 758 return loop.get_service_instance(service_type) 759# ========================================================== 760 761 762def service_fast_with_backup_loop(service_type: Type['Service'], backup_scheduler: Optional['CoroSchedulerType'] = None) -> 'Service': 763 if backup_scheduler is not None: 764 if not isinstance(backup_scheduler, CoroScheduler): 765 raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}') 766 767 loop = CoroScheduler.current_loop() 768 if loop is None: 769 # Outside the loop 770 loop = get_available_coro_scheduler() 771 if loop is None: 772 loop = backup_scheduler 773 774 if loop is None: 775 raise CoroSchedulerContextIsNotAvailable 776 777 return loop.get_service_instance_fast(service_type) 778 779 780def get_service_fast_with_backup_loop(service_type: Type['Service'], backup_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['Service']: 781 if backup_scheduler is not None: 782 if not isinstance(backup_scheduler, CoroScheduler): 783 raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}') 784 785 loop = CoroScheduler.current_loop() 786 if loop is None: 787 # Outside the loop 788 loop = get_available_coro_scheduler() 789 if loop is None: 790 loop = backup_scheduler 791 792 if loop is None: 793 return None 794 else: 795 return loop.get_service_instance_fast(service_type) 796 797 798def service_fast_with_explicit_loop(service_type: Type['Service'], explicit_scheduler: Optional['CoroSchedulerType'] = None) -> 'Service': 799 loop = explicit_scheduler 800 current_loop = CoroScheduler.current_loop() 801 if loop is None: 802 loop = current_loop 803 if loop is None: 804 # Outside the loop 805 loop = get_available_coro_scheduler() 806 else: 807 if not isinstance(loop, CoroScheduler): 808 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 809 810 if loop is None: 811 raise CoroSchedulerContextIsNotAvailable 812 813 return loop.get_service_instance_fast(service_type) 814 815 816def get_service_fast_with_explicit_loop(service_type: Type['Service'], explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['Service']: 817 loop = explicit_scheduler 818 current_loop = CoroScheduler.current_loop() 819 if loop is None: 820 loop = current_loop 821 if loop is None: 822 # Outside the loop 823 loop = get_available_coro_scheduler() 824 else: 825 if not isinstance(loop, CoroScheduler): 826 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 827 828 if loop is None: 829 return None 830 else: 831 return loop.get_service_instance_fast(service_type) 832# ========================================================== 833 834 835class CoroType(Enum): 836 auto = 0 837 awaitable = 1 838 greenlet = 2 839 840 841class ExplicitWorker: 842 def __init__(self, coro_type: CoroType, worker: Worker) -> None: 843 self.coro_type: CoroType = coro_type 844 self.worker: Worker = worker 845 846 847AnyWorker = Union[ExplicitWorker, Worker] 848 849 850@types.coroutine 851def yield_task_from_asyncawait(request: 'Request') -> Generator['Response', Any, Any]: 852 response = yield request # type: Response 853 return response 854 855 856class EntityStatsMixin: 857 class StatsLevel(Enum): 858 info = 0 859 debug = 1 860 861 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel') -> Tuple[str, Dict[str, Any]]: 862 raise NotImplementedError 863 864 865class CoroSchedulerIsCurrentlyDestroingError(Exception): 866 pass 867 868 869class CoroSchedulerDestroyException(Exception): 870 pass 871 872 873class CoroSchedulerDestroyRequestedException(Exception): 874 pass 875 876 877class CoroSchedulerBase(Iterable, EntityStatsMixin): 878 def __init__(self, logger: Optional[logging.Logger] = None): 879 self.in_iteration = False 880 self.services = dict() # type: Dict[ServiceType, Service] 881 self.live_services = dict() # type: Dict[ServiceType, Service] 882 self.live_low_latency_services = dict() # type: Dict[ServiceType, Service] 883 self.requests = list() # type: List[Request] 884 self.responses = list() # type: List[Response] 885 self.new_born_coroutines = list() # type: List[CoroWrapperBase] 886 self.coroutines = dict() # type: Dict[CoroID, CoroWrapperBase] 887 self.coro_counter = Counter() # type: Counter 888 self.services_in_work = 0 # type: int 889 self.services_in_foreground_work: int = 0 890 self.services_in_active_state = 0 # type: int 891 self.services_in_active_state_list = list() # uncomment lines in code for a debutting purposes (when some service prevents loop to go to sleep) 892 self.time_left_before_next_event: Optional[float] = None # type: Optional[float] 893 self.coro_on_del_handlers = set() # type: Set[Callable] 894 self.current_coro_interface = None # type: Optional['Interface'] 895 self.current_coro_wrapper: CoroWrapperBase = None 896 self.suppress_coro_exceptions = True # type: bool 897 self.iteration_index: int = 0 898 self.context_switches = 0 # type: int 899 self.current_coro_start_time: Optional[float] = None 900 self.coro_execution_time: Dict[int, float] = dict() 901 self.coro_longest_execution_time: Dict[int, float] = dict() 902 self.loop_iteration_start_time = 0.0 903 self.need_to_measure_loop_iteration_time = False # type: bool 904 self.need_to_measure_coro_time = False # type: bool 905 self.need_to_gather_coro_history = False # type: bool 906 self.permitted_use_put_coro_from_coro = False # type: bool 907 self.get_coro_start_time = self._fake_perf_counter 908 self.get_loop_iteration_start_time = self._fake_perf_counter 909 self.coro_history_gatherer = self._fake_method 910 self.on_woke_up_callback = self._fake_method 911 self.coro_on_start_handlers = set() 912 self.global_on_start_handlers_turned_on = False 913 self.execute_global_on_start_handlers = self._fake_execute_global_on_start_handlers 914 self.coro_workers_history = dict() # type: Dict[Worker, Set[CoroID]] 915 self.coro_full_history = dict() # type: Dict[CoroID, CoroWrapperBase] 916 self.run_low_latency_services = self._fake_run_low_latency_services 917 if logger is None: 918 self.logger = logging.getLogger(f'cengal.coro_scheduler.{id(self)}') 919 self.logger_stream_handler = logging.StreamHandler() 920 self.logger.addHandler(self.logger_stream_handler) 921 self.logger.setLevel(logging.INFO) 922 else: 923 self.logger = logger 924 925 self._in_work: bool = None 926 self._destroyed: bool = False 927 self.services_impostrors: Dict = dict() 928 self.keep_coro_execution_time_between_iterations = False 929 self.keep_coro_longest_execution_time_between_iterations = False 930 self.keep_coro_workers_history_between_iterations = False 931 self.keep_coro_full_history_between_iterations = False 932 self.loop_iteration_delta_time = 0.0 933 self.suppress_warnings_about_responses_to_not_existant_coroutines: bool = False 934 self.use_internal_sleep: bool = True 935 self.high_cpu_utilisation_mode: bool = False # For an effective, low-latency, inter-thread switching when there is more active high-CPU utilizing threads than CPU cores. Should be switched off for GUI applications in order to to decrease CPU utilization. 936 self.foreground_coro_num: int = 0 937 self.on_idle_handlers: Set[Callable] = set() 938 self.idle_managers_num: int = 0 939 self.on_wrong_request: Optional[Callable] = None 940 self.get_service_by_type: Callable = self.get_service_by_type_impl 941 self.get_service_instance_fast: Callable = self.get_service_instance_fast_impl 942 self.get_service_instance: Callable = self.get_service_instance_impl 943 self.make_service_live_fast: Callable = self.make_service_live_fast_impl 944 self.make_service_live: Callable = self.make_service_live_impl 945 self.make_service_dead_fast: Callable = self.make_service_dead_fast_impl 946 self.make_service_dead: Callable = self.make_service_dead_impl 947 self.sliding_window = deque(maxlen=1000) 948 self.need_to_log_loop_start_and_end: bool = True 949 self.on_destroyed_handlers: Set[Callable] = set() 950 self._coroutines_can_switch_directly: bool = False 951 self._next_coroutine: Optional[CoroWrapperBase] = None 952 self._is_current_coro_was_new_born: Union[bool, None] = None 953 954 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 955 if EntityStatsMixin.StatsLevel.info == stats_level: 956 func_info = full_func_info_to_printable_dict 957 else: 958 func_info = full_func_info_to_dict 959 960 services_stats = dict() 961 for service in self.services.values(): 962 if isinstance(service, EntityStatsMixin): 963 name, info = service.get_entity_stats(stats_level) 964 services_stats[name] = info 965 966 return type(self).__name__, { 967 'loop': { 968 'services': { 969 'num': len(self.services), 970 'list': [service_type.__name__ for service_type in self.services.keys()], 971 }, 972 'live services': { 973 'in work num': self.services_in_work, 974 'in foreground work num': self.services_in_foreground_work, 975 'services in active state': self.services_in_active_state, 976 'idleble services': self.services_in_work - self.services_in_active_state, 977 'num': len(self.live_services), 978 'list': [service_type.__name__ for service_type in self.live_services.keys()], 979 }, 980 'live low latency services': { 981 'num': len(self.live_low_latency_services), 982 'list': [service_type.__name__ for service_type in self.live_low_latency_services.keys()], 983 }, 984 'new born coroutines': { 985 'num': len(self.new_born_coroutines), 986 "list": { 987 wrapper.coro_id: { 988 "func_info": func_info(self.purify_worker(wrapper.worker)), 989 "args": [repr(arg) for arg in wrapper.init_args], 990 "kwargs": { 991 repr(key): repr(value) 992 for key, value in wrapper.init_kwargs.items() 993 }, 994 } 995 for wrapper in self.new_born_coroutines 996 }, 997 }, 998 'coroutines': { 999 'num': len(self.coroutines), 1000 "list": { 1001 coro_id: { 1002 "func_info": func_info(self.purify_worker(wrapper.worker)), 1003 "args": [repr(arg) for arg in wrapper.init_args], 1004 "kwargs": { 1005 repr(key): repr(value) 1006 for key, value in wrapper.init_kwargs.items() 1007 }, 1008 } 1009 for coro_id, wrapper in self.coroutines.items() 1010 }, 1011 }, 1012 'coro counter': self.coro_counter._index, 1013 'coro on del handlers num': len(self.coro_on_del_handlers), 1014 'coro on del handlers': { 1015 'num': len(self.coro_on_del_handlers), 1016 'list': [func_info(handler) for handler in self.coro_on_del_handlers] 1017 }, 1018 'suppress coro exceptions': self.suppress_coro_exceptions, 1019 'context switches num': self.context_switches, 1020 'longest continuous execution time of coroutines': self.coro_longest_execution_time, 1021 'coroutines execution times': self.coro_execution_time, 1022 'need to measure loop iteration time': self.need_to_measure_loop_iteration_time, 1023 'need to measure coro time': self.need_to_measure_coro_time, 1024 'need to gather coro history': self.need_to_gather_coro_history, 1025 'coro workers history': { 1026 'num': len(self.coro_workers_history), 1027 'list': [{ 1028 'worker': func_info(self.purify_worker(worker)), 1029 'coroutines': coroutines, 1030 } for worker, coroutines in self.coro_workers_history.items()] 1031 }, 1032 'coro full history': { 1033 'num': len(self.coro_full_history), 1034 'list': {coro_id: { 1035 'worker': func_info(self.purify_worker(wrapper.worker)), 1036 'args': [repr(arg) for arg in wrapper.init_args], 1037 'kwargs': {repr(key): repr(value) for key, value in wrapper.init_kwargs.items()}, 1038 } for coro_id, wrapper in self.coro_full_history.items()} 1039 }, 1040 'coro loop iteration delta time':self.loop_iteration_delta_time, 1041 }, 1042 'services': services_stats, 1043 } 1044 1045 def loop(self): 1046 raise NotImplementedError 1047 1048 def iteration(self): 1049 raise NotImplementedError 1050 1051 def log_exc(self, coro: 'CoroWrapperBase', exception: Exception): 1052 worker_info = full_func_info_to_printable_dict(self.purify_worker(coro.worker)) 1053 error_text = f'\nAn exception in coroutine "{worker_info["qualname"]}"\n\tModule "{worker_info["module"]}"\n\tFile "{worker_info["co_filename"]}", line {worker_info["co_firstlineno"]}:\n{exception_to_printable_text(exception)}' 1054 self.logger.error(f'{datetime.now()} >> {error_text}') 1055 1056 # @property 1057 # def root_coro(self): 1058 # return self._root_coro 1059 1060 # @root_coro.setter 1061 # def root_coro(self, root_coro): 1062 # if root_coro is None: 1063 # if __debug__: dlog('root_coro = None') 1064 # else: 1065 # if __debug__: dlog(f'root_coro = {root_coro}') 1066 # self._root_coro = root_coro 1067 1068 def _fake_method(self, *args, **kwargs): 1069 pass 1070 1071 @staticmethod 1072 def _fake_perf_counter(): 1073 return 0.0 1074 1075 def get_service_instance_fast_impl(self, service_type: ServiceType): 1076 return self.services[service_type] 1077 1078 def get_service_instance_impl(self, service_type: ServiceType): 1079 self.register_service(service_type) 1080 return self.get_service_instance_fast_impl(service_type) 1081 1082 def make_service_live_fast_impl(self, service_type: ServiceType, is_low_latency: bool = False): 1083 service: Optional[Service] = self.services.get(service_type, None) 1084 if service is not None: 1085 self.live_services[service_type] = self.services[service_type] 1086 if is_low_latency: 1087 self.live_low_latency_services[service_type] = self.services[service_type] 1088 1089 def make_service_live_impl(self, service_type: ServiceType, is_low_latency: bool = False): 1090 self.register_service(service_type) 1091 self.make_service_live_fast_impl(service_type, is_low_latency) 1092 1093 def make_service_dead_fast_impl(self, service_type: ServiceType): 1094 self.live_services.pop(service_type, None) 1095 self.live_low_latency_services.pop(service_type, None) 1096 1097 def make_service_dead_impl(self, service_type: ServiceType): 1098 self.register_service(service_type) 1099 self.make_service_dead_fast_impl(service_type) 1100 1101 def get_service_instance_fast_impostor_impl(self, service_type: ServiceType): 1102 return self.get_service_instance_fast_impl(self.services_impostrors.get(service_type, service_type)) 1103 1104 def get_service_instance_impostor_impl(self, service_type: ServiceType): 1105 return self.get_service_instance_impl(self.services_impostrors.get(service_type, service_type)) 1106 1107 def make_service_live_fast_impostor_impl(self, service_type: ServiceType, is_low_latency: bool = False): 1108 return self.make_service_live_fast_impl(self.services_impostrors.get(service_type, service_type), is_low_latency) 1109 1110 def make_service_live_impostor_impl(self, service_type: ServiceType, is_low_latency: bool = False): 1111 return self.make_service_live_impl(self.services_impostrors.get(service_type, service_type), is_low_latency) 1112 1113 def make_service_dead_fast_impostor_impl(self, service_type: ServiceType): 1114 return self.make_service_dead_fast_impl(self.services_impostrors.get(service_type, service_type)) 1115 1116 def make_service_dead_impostor_impl(self, service_type: ServiceType): 1117 return self.make_service_dead_impl(self.services_impostrors.get(service_type, service_type)) 1118 1119 def set_coro_time_measurement(self, need_to_measure_coro_time: bool): 1120 previous_value = self.need_to_measure_coro_time 1121 self.need_to_measure_coro_time = need_to_measure_coro_time 1122 if need_to_measure_coro_time: 1123 self.get_coro_start_time = perf_counter 1124 else: 1125 self.get_coro_start_time = self._fake_perf_counter 1126 1127 return previous_value 1128 1129 def set_coro_history_gathering(self, need_to_gather_coro_history: bool): 1130 previous_value = self.need_to_gather_coro_history 1131 self.need_to_gather_coro_history = need_to_gather_coro_history 1132 if need_to_gather_coro_history: 1133 self.coro_history_gatherer = self._default_coro_history_gatherer 1134 else: 1135 self.coro_history_gatherer = self._fake_method 1136 1137 return previous_value 1138 1139 def _default_coro_history_gatherer(self, original_worker: Callable, coro_wrapper: 'CoroWrapperBase'): 1140 coro_id: CoroID = coro_wrapper.coro_id 1141 coro_worker: Worker = coro_wrapper.worker 1142 1143 self.coro_full_history[coro_id] = coro_wrapper 1144 1145 if original_worker not in self.coro_workers_history: 1146 self.coro_workers_history[original_worker] = set() 1147 1148 self.coro_workers_history[original_worker].add(coro_id) 1149 1150 def set_loop_iteration_time_measurement(self, need_to_measure_loop_iteration_time: bool): 1151 previous_value = self.need_to_measure_loop_iteration_time 1152 self.need_to_measure_loop_iteration_time = need_to_measure_loop_iteration_time 1153 if need_to_measure_loop_iteration_time: 1154 self.get_loop_iteration_start_time = perf_counter 1155 else: 1156 self.get_loop_iteration_start_time = self._fake_perf_counter 1157 1158 return previous_value 1159 1160 def _new_coro_type_normalizer(self, coro_type: Optional[CoroType]) -> CoroType: 1161 raise NotImplementedError 1162 1163 def put_coro_fast(self, coro_type: Optional[CoroType], coro_worker: Worker, *args, **kwargs) -> 'CoroWrapperBase': 1164 """Must not be called from coroutine. Use an appropriate service instead, since leads to incorrect greenlets switches 1165 """ 1166 if (self.current_coro_interface is None) or self.permitted_use_put_coro_from_coro: 1167 coro_type = self._new_coro_type_normalizer(coro_type) 1168 coro_id = self.coro_counter.get() 1169 self.coro_execution_time[coro_id] = 0.0 1170 self.coro_longest_execution_time[coro_id] = 0.0 1171 coro = coro_wrapper_factory(coro_type, self, coro_id, coro_worker, *args, **kwargs) 1172 if isinstance(coro_worker, EntityArgsHolder): 1173 self.coro_history_gatherer(coro_worker.entity, coro) 1174 else: 1175 self.coro_history_gatherer(coro_worker, coro) 1176 1177 self.new_born_coroutines.append(coro) 1178 if not self.in_iteration: 1179 self.on_woke_up_callback() 1180 1181 return coro 1182 else: 1183 raise RuntimeError('CoroScheduler.put_coro() method must not be called from coroutine. Use an appropriate service instead') 1184 1185 def put_coro(self, coro_worker: AnyWorker, *args, **kwargs) -> 'CoroWrapperBase': 1186 if isinstance(coro_worker, EntityArgsHolder): 1187 coro_worker_real, args, kwargs = coro_worker.entity_args_kwargs() 1188 if isinstance(coro_worker, ExplicitWorker): 1189 coro_type: CoroType = coro_worker.coro_type 1190 coro_worker: EntityArgsHolderExplicit = EntityArgsHolderExplicit(coro_worker_real.worker, args, kwargs) 1191 return self.put_coro_fast(coro_type, coro_worker) 1192 else: 1193 return self.put_coro_fast(None, coro_worker) 1194 else: 1195 if isinstance(coro_worker, ExplicitWorker): 1196 return self.put_coro_fast(coro_worker.coro_type, coro_worker.worker, *args, **kwargs) 1197 else: 1198 return self.put_coro_fast(None, coro_worker, *args, **kwargs) 1199 1200 def register_service(self, service_type: ServiceType) -> bool: 1201 if service_type in self.services: 1202 return False 1203 else: 1204 service = service_type(self) 1205 self.services[service_type] = service 1206 self.live_services[service_type] = service 1207 return True 1208 1209 def is_service_registered(self, service_type: ServiceType) -> bool: 1210 return service_type in self.services 1211 1212 def unregister_service(self, service_type: ServiceType) -> bool: 1213 if service_type not in self.services: 1214 return True 1215 1216 service = self.services[service_type] 1217 self.services.pop(service_type, None) 1218 self.live_services.pop(service_type, None) 1219 self.live_low_latency_services.pop(service_type, None) 1220 try: 1221 service.destroy() 1222 except: 1223 self.logger.exception(f'{datetime.now()} >> Service {service_type} failed to destroy') 1224 return False 1225 1226 return True 1227 1228 def destroy_services(self): 1229 for service_type in tuple(self.services): 1230 self.unregister_service(service_type) 1231 1232 def get_service_by_type_impl(self, service_type: Type['Service']) -> 'Service': 1233 try: 1234 return self.services[service_type] 1235 except KeyError: 1236 self.register_service(service_type) 1237 return self.services[service_type] 1238 1239 def get_service_by_type_impostor_impl(self, service_type: Type['Service']) -> 'Service': 1240 return self.get_service_by_type_impl(self.services_impostrors.get(service_type, service_type)) 1241 1242 def register_service_impostor(self, service_type: Type['Service'], service_impostor_type: Optional[Type['Service']]) -> Optional[Type['Service']]: 1243 if service_impostor_type is None: 1244 an_old_impostor: Optional[Type['Service']] = self.services_impostrors.pop(service_type, None) 1245 else: 1246 an_old_impostor = self.services_impostrors.get(service_type, None) 1247 self.services_impostrors[service_type] = service_impostor_type 1248 1249 if self.services_impostrors: 1250 self.get_service_by_type = self.get_service_by_type_impostor_impl 1251 self.get_service_instance_fast = self.get_service_instance_fast_impostor_impl 1252 self.get_service_instance = self.get_service_instance_impostor_impl 1253 self.make_service_live_fast = self.make_service_live_fast_impostor_impl 1254 self.make_service_live = self.make_service_live_impostor_impl 1255 self.make_service_dead_fast = self.make_service_dead_fast_impostor_impl 1256 self.make_service_dead = self.make_service_dead_impostor_impl 1257 else: 1258 self.get_service_by_type = self.get_service_by_type_impl 1259 self.get_service_instance_fast = self.get_service_instance_fast_impl 1260 self.get_service_instance = self.get_service_instance_impl 1261 self.make_service_live_fast = self.make_service_live_fast_impl 1262 self.make_service_live = self.make_service_live_impl 1263 self.make_service_dead_fast = self.make_service_dead_fast_impl 1264 self.make_service_dead = self.make_service_dead_impl 1265 1266 return an_old_impostor 1267 1268 def find_new_born_coroutine(self, coro_id: CoroID) -> int: 1269 for index, coro in enumerate(self.new_born_coroutines): 1270 if coro.coro_id == coro_id: 1271 return index 1272 1273 return None 1274 1275 def get_coro(self, coro_id: CoroID) -> Optional['CoroWrapperBase']: 1276 if coro_id in self.coroutines: 1277 return self.coroutines[coro_id] 1278 else: 1279 new_born_coro_index = self.find_new_born_coroutine(coro_id) 1280 if new_born_coro_index is None: 1281 return None 1282 1283 return self.new_born_coroutines[new_born_coro_index] 1284 1285 @staticmethod 1286 def purify_worker(worker: Union[Callable, 'GreenletWorkerWrapper']) -> Callable: 1287 if isinstance(worker, GreenletWorkerWrapper): 1288 return worker.worker 1289 else: 1290 return worker 1291 1292 def set_global_on_start_handlers(self, turn_on_global_on_start_handlers: bool): 1293 self.global_on_start_handlers_turned_on = turn_on_global_on_start_handlers 1294 if turn_on_global_on_start_handlers: 1295 self.execute_global_on_start_handlers = self._execute_global_on_start_handlers 1296 else: 1297 self.execute_global_on_start_handlers = self._fake_execute_global_on_start_handlers 1298 1299 def add_on_global_on_start_handler(self, handler: Callable): 1300 self.coro_on_start_handlers.add(handler) 1301 1302 def _execute_global_on_start_handlers(self, coro: 'CoroWrapperBase') -> bool: 1303 # executes before new born corotine execution 1304 # if at least one of the handlers will return False - coro will not be started at all and removed from the coroutines list 1305 result = True 1306 for handler in self.coro_on_start_handlers: 1307 result &= handler(coro) 1308 1309 return result 1310 1311 def _fake_execute_global_on_start_handlers(self, coro: 'CoroWrapperBase') -> bool: 1312 # executes before new born corotine execution 1313 # if at least one of the handlers will return False - coro will not be started at all and removed from the coroutines list 1314 return True 1315 1316 1317 def process_coro_exit_status(self, coro: 'CoroWrapperBase', coro_exit_status: Optional['CoroExitStatus']): 1318 try: 1319 if coro_exit_status is None: 1320 self._run_global_on_coro_del_handlers(coro) 1321 else: 1322 if coro_exit_status.exception and (not coro_exit_status.properly_handled) and (not self.coro_on_del_handlers) and (not self.suppress_coro_exceptions): 1323 raise coro_exit_status.exception 1324 1325 coro_exit_status.properly_handled = self._run_global_on_coro_del_handlers(coro) or coro_exit_status.properly_handled 1326 1327 if coro_exit_status.exception and (not coro_exit_status.properly_handled): 1328 if self.suppress_coro_exceptions: 1329 # if __debug__: dlog(coro_exit_status.exception) 1330 # dlog(coro_exit_status.exception) 1331 self.log_exc(coro, coro_exit_status.exception) 1332 else: 1333 raise coro_exit_status.exception 1334 finally: 1335 coro.destroy() 1336 del coro 1337 1338 def kill_coro(self, coro: 'CoroWrapperBase'): 1339 coro.request_close() 1340 coro_exit_status: 'CoroExitStatus' = coro() 1341 if not coro: 1342 self.process_coro_exit_status(coro, coro_exit_status) 1343 1344 def throw_coro(self, coro: 'CoroWrapperBase', ex_type, ex_value=None, ex_traceback=None): 1345 # coro.request_close() 1346 coro.request_throw() # TODO: Investigate why it raises an exception in the loop and shuts down the loop 1347 coro_exit_status: 'CoroExitStatus' = coro(ex_type, ex_value, ex_traceback) 1348 if not coro: 1349 self.process_coro_exit_status(coro, coro_exit_status) 1350 1351 def forget_coro_by_id(self, coro: 'CoroWrapperBase') -> Optional['CoroWrapperBase']: 1352 return self.forget_coro_by_id(coro.coro_id) 1353 1354 def find_coro_by_id(self, coro_id: CoroID) -> Tuple[Optional['CoroWrapperBase'], bool, Optional[int]]: 1355 coro = None 1356 if coro_id in self.coroutines: 1357 was_new_born: bool = False 1358 coro = self.coroutines[coro_id] 1359 return coro, was_new_born, None 1360 else: 1361 was_new_born = True 1362 new_born_index = self.find_new_born_coroutine(coro_id) 1363 if new_born_index is not None: 1364 coro = self.new_born_coroutines[new_born_index] 1365 1366 return coro, was_new_born, new_born_index 1367 1368 def del_coro_by_id(self, coro_id: CoroID, was_new_born: bool, new_born_index: Optional[int] = None, request: Optional['Request'] = None): 1369 if was_new_born: 1370 del self.new_born_coroutines[new_born_index] 1371 else: 1372 self.coroutines.pop(coro_id, None) 1373 if isinstance(request, Request): 1374 try: 1375 self.requests.remove(request) 1376 except ValueError: 1377 pass 1378 1379 def kill_coro_by_id(self, coro_id: CoroID) -> bool: 1380 if self.current_coro_interface is not None: 1381 raise RuntimeError('CoroScheduler.kill_coro_by_id() Must not be called from the coroutine') 1382 1383 coro, was_new_born, new_born_index = self.find_coro_by_id(coro_id) 1384 if coro is not None: 1385 request: Optional[Request] = coro.last_result 1386 if was_new_born: 1387 self.process_coro_exit_status(coro, None) 1388 else: 1389 self.kill_coro(coro) 1390 1391 self.del_coro_by_id(coro_id, was_new_born, new_born_index, request=request) 1392 return True 1393 1394 return False 1395 1396 def throw_coro_by_id(self, coro_id: CoroID, ex_type, ex_value=None, ex_traceback=None) -> bool: 1397 if self.current_coro_interface is not None: 1398 raise RuntimeError('CoroScheduler.throw_coro_by_id() Must not be called from the coroutine') 1399 1400 coro, was_new_born, new_born_index = self.find_coro_by_id(coro_id) 1401 if coro is not None: 1402 request: Optional[Request] = coro.last_result 1403 if was_new_born: 1404 self.process_coro_exit_status(coro, None) 1405 else: 1406 self.throw_coro(coro, ex_type, ex_value, ex_traceback) 1407 1408 if not coro: 1409 self.del_coro_by_id(coro_id, was_new_born, new_born_index, request=request) 1410 1411 return True 1412 1413 return False 1414 1415 def request_coro_throw_by_id(self, coro_id: CoroID) -> Optional['CoroWrapperBase']: 1416 coro, _, _ = self.find_coro_by_id(coro_id) 1417 if coro is not None: 1418 coro.request_throw() 1419 1420 return coro 1421 1422 def request_coro_close_by_id(self, coro_id: CoroID) -> Optional['CoroWrapperBase']: 1423 coro, _, _ = self.find_coro_by_id(coro_id) 1424 if coro is not None: 1425 coro.request_close() 1426 1427 return coro 1428 1429 def _real_run_low_latency_services(self): 1430 for service_type, service in tuple(self.live_low_latency_services.items()): 1431 if service.in_work(): 1432 results = service.iteration() 1433 if results: 1434 # if __debug__: 1435 # for result in results: 1436 # if __debug__: dlog(f'λ <<< {repr(result)}') 1437 1438 self.responses.extend(results) 1439 1440 def _fake_run_low_latency_services(self): 1441 pass 1442 1443 def turn_on_embedded_mode(self, turn_on: bool = False): 1444 if turn_on: 1445 self.run_low_latency_services = self._real_run_low_latency_services 1446 else: 1447 self.run_low_latency_services = self._fake_run_low_latency_services 1448 1449 def add_global_on_coro_del_handler(self, callback: OnCoroDelHandler): 1450 self.coro_on_del_handlers.add(callback) 1451 1452 def _run_global_on_coro_del_handlers(self, coro: 'CoroWrapperBase') -> bool: 1453 if not self.coro_on_del_handlers: 1454 return False 1455 1456 exception_properly_handled = False 1457 for handler in self.coro_on_del_handlers: 1458 exception_properly_handled = handler(coro) or exception_properly_handled 1459 return exception_properly_handled 1460 1461 def restore_global_state(self): 1462 """ 1463 Must be run immediately after `await ...` from inside Coro or Service when running from inside external 1464 async loop 1465 :return: 1466 """ 1467 _current_coro_scheduler.scheduler = self 1468 1469 @staticmethod 1470 def current_loop() -> Optional['CoroSchedulerType']: 1471 return _current_coro_scheduler.scheduler 1472 1473 def current_interface(self) -> Optional['Interface']: 1474 return self.current_coro_interface 1475 1476 def current_wrapper(self) -> Optional['CoroWrapperBase']: 1477 return self.current_coro_wrapper 1478 1479 def destroy_all_coroutines(self, ex_type=None, ex_value=None, ex_traceback=None): 1480 while self.new_born_coroutines or self.coroutines: 1481 new_born_coroutines_buff = self.new_born_coroutines 1482 # self.new_born_coroutines = type(new_born_coroutines_buff)() 1483 self.new_born_coroutines = list() 1484 for coro in new_born_coroutines_buff: 1485 coro: 'CoroWrapperBase' = coro 1486 coro_id: CoroID = coro.coro_id 1487 if (ex_type is not None) or (ex_value is not None): 1488 self.throw_coro_by_id(coro_id, ex_type, ex_value, ex_traceback) 1489 1490 if coro: 1491 self.kill_coro_by_id(coro_id) 1492 1493 coroutines_buff = self.coroutines 1494 # self.coroutines = type(coroutines_buff)() 1495 self.coroutines = type(coroutines_buff)() 1496 for coro in coroutines_buff.values(): 1497 coro: 'CoroWrapperBase' = coro 1498 coro_id: CoroID = coro.coro_id 1499 if (ex_type is not None) or (ex_value is not None): 1500 self.throw_coro_by_id(coro_id, ex_type, ex_value, ex_traceback) 1501 1502 if coro: 1503 self.kill_coro_by_id(coro_id) 1504 1505 def destroy_new_born_coroutines(self, ex_type=None, ex_value=None, ex_traceback=None): 1506 while self.new_born_coroutines: 1507 new_born_coroutines_buff = self.new_born_coroutines 1508 # self.new_born_coroutines = type(new_born_coroutines_buff)() 1509 self.new_born_coroutines = list() 1510 for coro in new_born_coroutines_buff: 1511 coro: 'CoroWrapperBase' = coro 1512 coro_id: CoroID = coro.coro_id 1513 if (ex_type is not None) or (ex_value is not None): 1514 self.throw_coro_by_id(coro_id, ex_type, ex_value, ex_traceback) 1515 1516 if coro: 1517 self.kill_coro_by_id(coro_id) 1518 1519 def destroy_coroutines(self, ex_type=None, ex_value=None, ex_traceback=None): 1520 while self.coroutines: 1521 coroutines_buff = self.coroutines 1522 # self.coroutines = type(coroutines_buff)() 1523 self.coroutines = dict() 1524 coroutines_buff_values = coroutines_buff.values() 1525 for coro in coroutines_buff_values: 1526 coro: 'CoroWrapperBase' = coro 1527 coro_id: CoroID = coro.coro_id 1528 if (ex_type is not None) or (ex_value is not None): 1529 self.throw_coro_by_id(coro_id, ex_type, ex_value, ex_traceback) 1530 1531 if coro: 1532 self.kill_coro_by_id(coro_id) 1533 1534 def destroy(self): 1535 if self.need_to_log_loop_start_and_end: 1536 self.logger.info(f'{datetime.now()} >> {type(self).__name__} destroy...') 1537 1538 if not self._destroyed: 1539 def create_entity_during_destroy(*args, **kwargs): 1540 raise CoroSchedulerIsCurrentlyDestroingError 1541 1542 put_coro_fast_buf = self.put_coro_fast 1543 self.put_coro_fast = create_entity_during_destroy 1544 register_service_bak = self.register_service 1545 self.register_service = create_entity_during_destroy 1546 1547 try: 1548 while self.new_born_coroutines or self.coroutines: 1549 new_born_coroutines_buff = self.new_born_coroutines 1550 # self.new_born_coroutines = type(new_born_coroutines_buff)() 1551 self.new_born_coroutines = list() 1552 for coro in new_born_coroutines_buff: 1553 coro: 'CoroWrapperBase' = coro 1554 coro_id: CoroID = coro.coro_id 1555 repr_coro: str = repr(coro) 1556 try: 1557 self.throw_coro_by_id(coro_id, CoroSchedulerDestroyException) 1558 except CoroSchedulerIsCurrentlyDestroingError: 1559 self.logger.warning(f'{datetime.now()} >> Unhandled `CoroSchedulerIsCurrentlyDestroingError` exception during throw CoroSchedulerDestroyException to new born coroutine {repr_coro}') 1560 pass 1561 1562 try: 1563 if coro: 1564 self.kill_coro_by_id(coro_id) 1565 except CoroSchedulerIsCurrentlyDestroingError: 1566 self.logger.warning(f'{datetime.now()} >> Unhandled `CoroSchedulerIsCurrentlyDestroingError` exception during kill new born coroutine {repr_coro}') 1567 pass 1568 1569 coroutines_buff = self.coroutines 1570 # self.coroutines = type(coroutines_buff)() 1571 self.coroutines = dict() 1572 for coro in coroutines_buff.values(): 1573 coro: 'CoroWrapperBase' = coro 1574 coro_id: CoroID = coro.coro_id 1575 repr_coro: str = repr(coro) 1576 try: 1577 self.throw_coro_by_id(coro_id, CoroSchedulerDestroyException) 1578 except CoroSchedulerIsCurrentlyDestroingError: 1579 self.logger.warning(f'{datetime.now()} >> Unhandled `CoroSchedulerIsCurrentlyDestroingError` exception during throw CoroSchedulerDestroyException to coroutine {repr_coro}') 1580 pass 1581 1582 try: 1583 if coro: 1584 self.kill_coro_by_id(coro_id) 1585 except CoroSchedulerIsCurrentlyDestroingError: 1586 self.logger.warning(f'{datetime.now()} >> Unhandled `CoroSchedulerIsCurrentlyDestroingError` exception during kill coroutine {repr_coro}') 1587 pass 1588 1589 # self.requests = type(self.requests)() 1590 self.requests = list() 1591 # self.responses = type(self.responses)() 1592 self.responses = list() 1593 1594 for service_type in tuple(self.services): 1595 try: 1596 self.unregister_service(service_type) 1597 except CoroSchedulerIsCurrentlyDestroingError: 1598 self.logger.warning(f'{datetime.now()} >> Unhandled `CoroSchedulerIsCurrentlyDestroingError` exception during destroy service "{service_type}"') 1599 pass 1600 1601 # self.requests = type(self.requests)() 1602 self.requests = list() 1603 # self.responses = type(self.responses)() 1604 self.responses = list() 1605 finally: 1606 self.put_coro_fast = put_coro_fast_buf 1607 self.register_service = register_service_bak 1608 self._destroyed = True 1609 for on_destroyed_handler in self.on_destroyed_handlers: 1610 on_destroyed_handler() 1611 1612 if self.need_to_log_loop_start_and_end: 1613 self.logger.info(f'{datetime.now()} >> {type(self).__name__} destroyed.') 1614 1615 self.logger.removeHandler(self.logger_stream_handler) 1616 1617 1618 def _update_in_work(self): 1619 # self._in_work = self.new_born_coroutines or self.coroutines or self.services_in_work 1620 self._in_work = (self.foreground_coro_num > 0) or self.services_in_foreground_work 1621 return self._in_work 1622 1623 def in_work(self): 1624 in_work_result = self._in_work 1625 return self._update_in_work() if in_work_result is None else in_work_result 1626 1627 def _update_is_awake(self): 1628 self._is_awake = self.new_born_coroutines or self.coroutines or self.live_services or self.live_low_latency_services 1629 return self._is_awake 1630 1631 def is_awake(self): 1632 is_awake_result = self._is_awake 1633 return self._update_is_awake() if is_awake_result is None else is_awake_result 1634 1635 def is_idle(self) -> bool: 1636 # return (self.next_event_after() is not None) and (not self.new_born_coroutines) and (not self.services_in_active_state) and (not self.responses) 1637 return (not self.new_born_coroutines) and (not self.services_in_active_state) and (0 == (len(self.responses) - self.idle_managers_num)) 1638 1639 def next_event_after(self) -> Optional[Union[float, int]]: 1640 return self.time_left_before_next_event 1641 1642 def _loop_imp(self): 1643 try: 1644 next_event_after = None 1645 # while self.in_work() or (next_event_after is not None): 1646 while self.in_work(): 1647 self._iteration_imp() 1648 next_event_after = self.next_event_after() 1649 # if (next_event_after is not None) and (not self.new_born_coroutines) and (not self.services_in_active_state) and (not self.responses): 1650 # if self.use_internal_sleep or (not self.on_idle_handlers): 1651 # sleep(next_event_after) 1652 # else: 1653 # for handler in self.on_idle_handlers: 1654 # handler(next_event_after) 1655 # # else: 1656 # # print(self.services_in_active_state_list) 1657 1658 if self.is_idle(): 1659 if self.use_internal_sleep: 1660 if next_event_after is None: 1661 sleep(high_cpu_utilisation_mode=self.high_cpu_utilisation_mode) 1662 else: 1663 sleep(next_event_after, high_cpu_utilisation_mode=self.high_cpu_utilisation_mode) 1664 1665 for handler in self.on_idle_handlers: 1666 handler(next_event_after) 1667 except CoroSchedulerDestroyRequestedException: 1668 pass 1669 finally: 1670 self.destroy() 1671 1672 def _iteration_imp(self): 1673 current_coro_scheduler_buff = _current_coro_scheduler.scheduler 1674 _current_coro_scheduler.scheduler = self 1675 1676 self.in_iteration = True 1677 1678 cpu_clock_cycles_start_time = cpu_clock_cycles() 1679 # minus_delta_time = 0 1680 try: 1681 self.context_switches += len(self.new_born_coroutines) + len(self.responses) 1682 self.loop_iteration_start_time = self.get_loop_iteration_start_time() 1683 1684 if not self.keep_coro_execution_time_between_iterations: 1685 # self.coro_execution_time = type(self.coro_execution_time)() 1686 self.coro_execution_time = dict() 1687 if not self.keep_coro_longest_execution_time_between_iterations: 1688 # self.coro_longest_execution_time = type(self.coro_longest_execution_time)() 1689 self.coro_longest_execution_time = dict() 1690 if not self.keep_coro_workers_history_between_iterations: 1691 # self.coro_workers_history = type(self.coro_workers_history)() 1692 self.coro_workers_history = dict() 1693 if not self.keep_coro_full_history_between_iterations: 1694 # self.coro_full_history = type(self.coro_full_history)() 1695 self.coro_full_history = dict() 1696 1697 # minus_start_time = cpu_clock_cycles() 1698 self.run_low_latency_services() 1699 # minus_delta_time += cpu_clock_cycles() - minus_start_time 1700 1701 new_born_coroutines_buff = self.new_born_coroutines 1702 # self.new_born_coroutines = type(new_born_coroutines_buff)() 1703 1704 # TODO: we need to give first coro a list of all coros and an index (0 at the moment) 1705 # coro will make required work, find next Greenlet coro in the list and will swithch 1706 # to it given same list and an updated index. Next coro will do the same. 1707 # then an every coro returns and as result we will return here. After that, 1708 # loop will process all Asyncio coroutines in a normal maner 1709 # if self._coroutines_can_switch_directly: 1710 # pass 1711 1712 self.new_born_coroutines = list() 1713 self._is_current_coro_was_new_born = True 1714 for coro in new_born_coroutines_buff: 1715 coro: CoroWrapperBase = coro 1716 if self.execute_global_on_start_handlers(coro): 1717 self.current_coro_start_time = self.get_coro_start_time() 1718 self.current_coro_wrapper = coro 1719 # minus_start_time = cpu_clock_cycles() 1720 coro_exit_status: CoroExitStatus = coro.init(self.root_coro) 1721 # minus_delta_time += cpu_clock_cycles() - minus_start_time 1722 self.current_coro_wrapper = None 1723 coro_execution_piece_delta_time = self.get_coro_start_time() - self.current_coro_start_time 1724 self.current_coro_start_time = None 1725 coro_id = coro.coro_id 1726 if coro_id not in self.coro_execution_time: 1727 self.coro_execution_time[coro_id] = 0 1728 1729 self.coro_execution_time[coro_id] += coro_execution_piece_delta_time 1730 if coro_id not in self.coro_longest_execution_time: 1731 self.coro_longest_execution_time[coro_id] = 0 1732 1733 self.coro_longest_execution_time[coro_id] = max(self.coro_longest_execution_time[coro_id], coro_execution_piece_delta_time) 1734 if coro: 1735 coro_last_result = coro.last_result 1736 if not isinstance(coro_last_result, Request): 1737 if self.on_wrong_request is None: 1738 # minus_start_time = cpu_clock_cycles() 1739 self.logger.warning(f'{datetime.now()} >> Wrong request {repr(coro_last_result)} of type {type(coro_last_result)} from coroutine {repr(coro)}') 1740 self.kill_coro_by_id(coro_id) 1741 # minus_delta_time += cpu_clock_cycles() - minus_start_time 1742 continue 1743 else: 1744 # minus_start_time = cpu_clock_cycles() 1745 coro_last_result = self.on_wrong_request(coro, coro_last_result) 1746 # minus_delta_time += cpu_clock_cycles() - minus_start_time 1747 1748 self.requests.append(coro_last_result) 1749 self.coroutines[coro_id] = coro 1750 continue 1751 else: 1752 # minus_start_time = cpu_clock_cycles() 1753 self.process_coro_exit_status(coro, coro_exit_status) 1754 # minus_delta_time += cpu_clock_cycles() - minus_start_time 1755 1756 responses_buff = self.responses 1757 # self.responses = type(responses_buff)() 1758 1759 # TODO: we need to give first coro a list of all coros and an index (0 at the moment) 1760 # coro will make required work, find next Greenlet coro in the list and will swithch 1761 # to it given same list and an updated index. Next coro will do the same. 1762 # then an every coro returns and as result we will return here. After that, 1763 # loop will process all Asyncio coroutines in a normal maner 1764 # if self._coroutines_can_switch_directly: 1765 # pass 1766 1767 self.responses = list() 1768 self._is_current_coro_was_new_born = False 1769 for response in responses_buff: 1770 coro_id = response.coro_id 1771 if coro_id not in self.coroutines: 1772 if not self.suppress_warnings_about_responses_to_not_existant_coroutines: 1773 self.logger.warning(f'{datetime.now()} >> Coroutine {coro_id} has a response but does not exists: {repr(response)}') 1774 1775 continue 1776 1777 coro = self.coroutines[coro_id] 1778 1779 self.current_coro_start_time = self.get_coro_start_time() 1780 self.current_coro_wrapper = coro 1781 if isinstance(response, DirectResponse): 1782 response = response.response 1783 1784 # minus_start_time = cpu_clock_cycles() 1785 coro_exit_status: CoroExitStatus = coro(response) 1786 # minus_delta_time += cpu_clock_cycles() - minus_start_time 1787 self.current_coro_wrapper = None 1788 coro_execution_piece_delta_time = self.get_coro_start_time() - self.current_coro_start_time 1789 self.current_coro_start_time = None 1790 coro_id = coro.coro_id 1791 if coro_id not in self.coro_execution_time: 1792 self.coro_execution_time[coro_id] = 0 1793 1794 self.coro_execution_time[coro_id] += coro_execution_piece_delta_time 1795 if coro_id not in self.coro_longest_execution_time: 1796 self.coro_longest_execution_time[coro_id] = 0 1797 1798 self.coro_longest_execution_time[coro_id] = max(self.coro_longest_execution_time[coro_id], coro_execution_piece_delta_time) 1799 if coro: 1800 coro_last_result = coro.last_result 1801 if not isinstance(coro_last_result, Request): 1802 if self.on_wrong_request is None: 1803 # minus_start_time = cpu_clock_cycles() 1804 self.logger.warning(f'{datetime.now()} >> Wrong request {repr(coro_last_result)} of type {type(coro_last_result)} from coroutine {repr(coro)}') 1805 self.kill_coro_by_id(coro_id) 1806 # minus_delta_time += cpu_clock_cycles() - minus_start_time 1807 continue 1808 else: 1809 # minus_start_time = cpu_clock_cycles() 1810 coro_last_result = self.on_wrong_request(coro, coro_last_result) 1811 # minus_delta_time += cpu_clock_cycles() - minus_start_time 1812 1813 self.requests.append(coro_last_result) 1814 continue 1815 else: 1816 del self.coroutines[coro_id] 1817 # minus_start_time = cpu_clock_cycles() 1818 self.process_coro_exit_status(coro, coro_exit_status) 1819 # minus_delta_time += cpu_clock_cycles() - minus_start_time 1820 1821 self._is_current_coro_was_new_born = None 1822 requests_buff = self.requests 1823 # self.requests = type(requests_buff)() 1824 self.requests = list() 1825 for request in requests_buff: 1826 # if __debug__: dlog(f'λ >>> {repr(request)}') 1827 service_type = request.service_type 1828 service: 'Service' = self.get_service_by_type(service_type) 1829 # minus_start_time = cpu_clock_cycles() 1830 result = service.put_task(request) 1831 # minus_delta_time += cpu_clock_cycles() - minus_start_time 1832 if result is not None: 1833 # if __debug__: dlog(f'λ <<< {repr(result)}') 1834 self.responses.append(result) 1835 1836 self.services_in_work = 0 1837 self.services_in_foreground_work = 0 1838 self.services_in_active_state = 0 1839 # self.services_in_active_state_list = list() 1840 self.time_left_before_next_event = None 1841 for service_type, service in tuple(self.live_services.items()): 1842 # minus_start_time = cpu_clock_cycles() 1843 in_work_result = service.in_work() 1844 # minus_delta_time += cpu_clock_cycles() - minus_start_time 1845 if in_work_result: 1846 self.services_in_work += 1 1847 self.services_in_active_state += 1 1848 # minus_start_time = cpu_clock_cycles() 1849 results = service.iteration() 1850 # minus_delta_time += cpu_clock_cycles() - minus_start_time 1851 # minus_start_time = cpu_clock_cycles() 1852 in_work_result = service.in_work() 1853 # minus_delta_time += cpu_clock_cycles() - minus_start_time 1854 if in_work_result: 1855 self.services_in_foreground_work += 1 if service.in_forground_work() else 0 1856 # minus_start_time = cpu_clock_cycles() 1857 has_planned_events, time_left_before_next_event = service.time_left_before_next_event() 1858 # minus_delta_time += cpu_clock_cycles() - minus_start_time 1859 if has_planned_events and (time_left_before_next_event is not None): 1860 self.services_in_active_state -= 1 1861 if self.time_left_before_next_event is None: 1862 self.time_left_before_next_event = time_left_before_next_event 1863 elif time_left_before_next_event < self.time_left_before_next_event: 1864 self.time_left_before_next_event = time_left_before_next_event 1865 # else: 1866 # self.services_in_active_state_list.append(service_type) 1867 else: 1868 self.services_in_work -= 1 1869 self.services_in_active_state -= 1 1870 # self.services_in_active_state_list.append(service_type) 1871 1872 if results: 1873 # if __debug__: 1874 # for result in results: 1875 # if __debug__: dlog(f'λ <<< {repr(result)}') 1876 1877 self.responses.extend(results) 1878 except CoroSchedulerDestroyRequestedException: 1879 raise 1880 except: 1881 # if __debug__: dlog('Loop Exception') 1882 self.logger.exception(f'{datetime.now()} >> Loop Exception') 1883 raise 1884 finally: 1885 self._update_in_work() 1886 self._update_is_awake() 1887 self.loop_iteration_delta_time = self.get_loop_iteration_start_time() - self.loop_iteration_start_time 1888 self.loop_cpu_clock_cycles = cpu_clock_cycles() - cpu_clock_cycles_start_time 1889 # self.sliding_window.append(cpu_clock_cycles() - cpu_clock_cycles_start_time - minus_delta_time) 1890 # self.sliding_window.append(minus_delta_time) 1891 self.iteration_index += 1 1892 # print() 1893 _current_coro_scheduler.scheduler = current_coro_scheduler_buff 1894 self.in_iteration = False 1895 1896 1897class CoroSchedulerGreenlet(CoroSchedulerBase): 1898 def __init__(self, logger: Optional[logging.Logger] = None): 1899 super().__init__(logger) 1900 self.root_coro_loop = greenlet(self._loop_imp) # type: Coro 1901 # self.root_coro_iteration = greenlet(self._iteration_imp) # type: Coro 1902 self.root_coro_iteration = greenlet(self._iter_wrapper) # type: Coro 1903 # self._root_coro = None # type: Optional[Coro] 1904 self.root_coro = None # type: Optional[Coro] 1905 self._coroutines_can_switch_directly = True 1906 1907 def _new_coro_type_normalizer(self, coro_type: Optional[CoroType]) -> CoroType: 1908 return coro_type or CoroType.auto 1909 1910 def loop(self): 1911 if self.need_to_log_loop_start_and_end: 1912 self.logger.info(f'{datetime.now()} >> {type(self).__name__} loop started...') 1913 1914 self.root_coro = self.root_coro_loop 1915 1916 # if __debug__: dlog('Switch to root_coro') 1917 self.root_coro.switch() 1918 # if __debug__: dlog('Switch from root_coro') 1919 self.root_coro = None 1920 self.root_coro_loop = greenlet(self._loop_imp) 1921 1922 def iteration(self): 1923 # global _debug_log_counter 1924 # last_debug_log_counter = _debug_log_counter 1925 # if __debug__: dlog(f'^ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}') 1926 1927 self.root_coro = self.root_coro_iteration 1928 in_work = False 1929 try: 1930 # if __debug__: dlog('Switch to root_coro (iteration)') 1931 in_work = self.root_coro.switch(True) 1932 # if __debug__: dlog('Switch from root_coro (iteration)') 1933 except GreenletExit: 1934 self.stop_iteration() 1935 # if __debug__: dlog('Switch from root_coro (GreenletExit)') 1936 except: 1937 self.stop_iteration() 1938 # if __debug__: dlog('Switch from root_coro (Exception):') 1939 # if __debug__: dlog(sys.exc_info()) 1940 raise 1941 finally: 1942 self.root_coro = None 1943 # if __debug__: 1944 # if last_debug_log_counter < _debug_log_counter: 1945 # if __debug__: dlog(f'_ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}') 1946 1947 return in_work 1948 1949 def stop_iteration(self): 1950 try: 1951 in_work = self.root_coro.switch(False) 1952 except GreenletExit: 1953 self.stop_iteration() 1954 # if __debug__: dlog('Stop root_coro (GreenletExit)') 1955 except: 1956 self.stop_iteration() 1957 # if __debug__: dlog('Stop root_coro (Exception):') 1958 # if __debug__: dlog(sys.exc_info()) 1959 raise 1960 finally: 1961 self.root_coro = None 1962 self.root_coro_iteration = greenlet(self._iter_wrapper) 1963 1964 def _iter_wrapper(self, proceed: bool=True): 1965 try: 1966 while proceed: 1967 self._iteration_imp() 1968 proceed = greenlet.getcurrent().parent.switch(self.in_work()) 1969 except CoroSchedulerDestroyRequestedException: 1970 pass 1971 finally: 1972 self.destroy() 1973 1974 1975class CoroSchedulerAwaitable(CoroSchedulerBase): 1976 def __init__(self, logger: Optional[logging.Logger] = None): 1977 super().__init__(logger) 1978 self.root_coro = None # type: Optional[Coro] 1979 1980 def _new_coro_type_normalizer(self, coro_type: Optional[CoroType]) -> CoroType: 1981 return CoroType.awaitable 1982 1983 def loop(self): 1984 if self.need_to_log_loop_start_and_end: 1985 self.logger.info(f'{datetime.now()} >> {type(self).__name__} loop started...') 1986 1987 self._loop_imp() 1988 1989 def iteration(self): 1990 # global _debug_log_counter 1991 # last_debug_log_counter = _debug_log_counter 1992 # if __debug__: dlog(f'^ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}') 1993 1994 need_to_destroy = False 1995 try: 1996 self._iteration_imp() 1997 return self.in_work() 1998 except CoroSchedulerDestroyRequestedException: 1999 need_to_destroy = True 2000 except: 2001 need_to_destroy = True 2002 raise 2003 finally: 2004 # if __debug__: 2005 # if last_debug_log_counter < _debug_log_counter: 2006 # if __debug__: dlog(f'_ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}') 2007 2008 if need_to_destroy: 2009 self.destroy() 2010 2011 2012CoroScheduler = CoroSchedulerGreenlet if greenlet_awailable else CoroSchedulerAwaitable 2013 2014 2015CoroSchedulerType: TypeAlias = Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable] 2016 2017 2018def current_interface() -> Optional['Interface']: 2019 return current_coro_scheduler().current_interface() 2020 2021 2022def execute_coro(coro_worker: AnyWorker, *args, **kwargs) -> Any: 2023 return coro_worker(current_interface(), *args, **kwargs) 2024 2025 2026exec_coro = execute_coro 2027ecoro = execute_coro 2028 2029 2030async def aexecute_coro(coro_worker: AnyWorker, *args, **kwargs) -> Any: 2031 return await coro_worker(current_interface(), *args, **kwargs) 2032 2033 2034aexec_coro = aexecute_coro 2035aecoro = aexecute_coro 2036 2037 2038@contextmanager 2039def around_await(): 2040 """ 2041 It is very bad idea to await from inside the greenlet since it might lead to problems with stack 2042 state in an outside awaitable code and it's loop. 2043 Must be run around `await ...` from inside Coro or Service when running from inside external async loop 2044 :return: 2045 """ 2046 loop = _current_coro_scheduler.scheduler 2047 try: 2048 yield 2049 finally: 2050 _current_coro_scheduler.scheduler = loop 2051 2052 2053class Request: 2054 def __init__(self, coro: 'CoroWrapperBase', service_type: ServiceType, *args, **kwargs): 2055 self.coro = coro 2056 self.coro_id = coro.coro_id 2057 self.service_type = service_type 2058 self.args = args 2059 self.kwargs = kwargs 2060 2061 def __repr__(self): 2062 return f'<{self.__class__.__name__}(coro: {self.coro}, coro_id: {self.coro_id}, service_type: {self.service_type}, args: {self.args}, kwargs: {self.kwargs})>' 2063 2064 2065class Response: 2066 def __init__(self, coro_id: CoroID, service_type: ServiceType, response: Any, exception: Optional[BaseException]=None): 2067 self.coro_id = coro_id 2068 self.service_type = service_type 2069 self.response = response 2070 self.exception = exception 2071 2072 def __call__(self) -> Any: 2073 if self.exception: 2074 raise self.exception 2075 return self.response 2076 2077 def __repr__(self): 2078 return f'<{self.__class__.__name__}(coro_id: {self.coro_id}, service_type: {self.service_type}, response: {self.response}, exception: {self.exception})>' 2079 2080 2081class DirectResponse(Response): 2082 pass 2083 2084 2085class CoroExitStatus: 2086 def __init__(self, exception, properly_handled): 2087 self.exception = exception 2088 self.properly_handled = properly_handled 2089 2090 2091class CoroWrapperBase: 2092 def __init__(self, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs): 2093 if isinstance(worker, EntityArgsHolder): 2094 worker, args, kwargs = worker.entity_args_kwargs() 2095 2096 self.worker = worker # type: Worker 2097 self.init_args = args # type: Tuple[Any, ...] 2098 self.init_kwargs = kwargs # type: Dict 2099 self.coro_id: CoroID = coro_id # type: CoroID 2100 self.loop: CoroSchedulerBase = loop # type: CoroSchedulerBase 2101 self.coro = None # type: Optional[Coro] 2102 self.parent_coro = None # type: Optional[Coro] 2103 self.interface = None # type: Optional[Interface] 2104 self.last_result = None # type: Optional[Union[Request, Any]] # return value can be any 2105 self.exception = None # type: Optional[Exception] # If there was an exception 2106 self.coro_on_del_handlers = set() # type: Set[Callable] 2107 self._make_coro_method = self._raise_not_implemented_error # type: Callable 2108 self._make_interface = self._raise_not_implemented_error # type: Callable 2109 self._init_method = self._raise_not_implemented_error # type: Callable 2110 self._call_method = self._raise_not_implemented_error # type: Callable 2111 self._throw_method = self._raise_not_implemented_error # type: Callable 2112 self._close_method = self._raise_not_implemented_error # type: Callable 2113 self._current_call_method = self._call_method # type: Callable 2114 self._is_background_coro: bool = False 2115 self.loop.foreground_coro_num += 1 2116 2117 def _travers_through_coro_on_del_handlers(self, coro_exit_status: 'CoroExitStatus'): 2118 if not self.coro_on_del_handlers: 2119 return coro_exit_status 2120 2121 exception_properly_handled = coro_exit_status.properly_handled 2122 for handler in self.coro_on_del_handlers: 2123 exception_properly_handled = handler(self) or exception_properly_handled 2124 coro_exit_status.properly_handled = exception_properly_handled 2125 return coro_exit_status 2126 2127 def init(self, parent_coro: Optional[Coro] = None) -> Union[None, 'CoroExitStatus']: 2128 self.parent_coro = parent_coro 2129 self.coro = self._make_coro_method() 2130 self.interface = self._make_interface() 2131 current_coro_interface_buff = self.loop.current_coro_interface 2132 self.loop.current_coro_interface = self.interface 2133 try: 2134 self._init_method(self.init_args, self.init_kwargs) 2135 if self: 2136 return None 2137 else: 2138 # if __debug__: dlog(f'LAST_RESULT. Type: {type(self.last_result)}; Value: {self.last_result}') 2139 return self._travers_through_coro_on_del_handlers(CoroExitStatus(None, True)) 2140 except: 2141 self.exception = get_exception() 2142 if not self.coro_on_del_handlers: 2143 return CoroExitStatus(self.exception, False) 2144 return self._travers_through_coro_on_del_handlers(CoroExitStatus(self.exception, False)) 2145 finally: 2146 self.loop.current_coro_interface = current_coro_interface_buff 2147 2148 def __call__(self, *args, **kwargs) -> Union[None, 'CoroExitStatus']: 2149 current_coro_interface_buff = self.loop.current_coro_interface 2150 self.loop.current_coro_interface = self.interface 2151 try: 2152 self._current_call_method(*args, **kwargs) 2153 if self: 2154 return None 2155 else: 2156 # if __debug__: dlog(f'LAST_RESULT. Type: {type(self.last_result)}; Value: {self.last_result}') 2157 return self._travers_through_coro_on_del_handlers(CoroExitStatus(None, True)) 2158 except: 2159 self.exception = get_exception() 2160 if not self.coro_on_del_handlers: 2161 return CoroExitStatus(self.exception, False) 2162 return self._travers_through_coro_on_del_handlers(CoroExitStatus(self.exception, False)) 2163 finally: 2164 self.loop.current_coro_interface = current_coro_interface_buff 2165 2166 def __bool__(self) -> bool: 2167 raise NotImplementedError 2168 2169 def add_on_coro_del_handler(self, callback: OnCoroDelHandler): 2170 self.coro_on_del_handlers.add(callback) 2171 2172 def _raise_not_implemented_error(self, *args, **kwargs): 2173 pass 2174 raise NotImplementedError 2175 return self._raise_not_implemented_error # Suppressing lint error 2176 2177 def _current_throw_method_helper(self, *args, **kwargs): 2178 self._throw_method(*args, **kwargs) 2179 self._current_call_method = self._call_method 2180 2181 def request_throw(self) -> Any: 2182 self._current_call_method = self._current_throw_method_helper 2183 2184 def _current_close_method_helper(self, *args, **kwargs): 2185 self._close_method() 2186 self._current_call_method = self._call_method 2187 2188 def request_close(self) -> Any: 2189 self._current_call_method = self._current_close_method_helper 2190 2191 @property 2192 def is_background_coro(self): 2193 return self._is_background_coro 2194 2195 @is_background_coro.setter 2196 def is_background_coro(self, value: bool): 2197 if value: 2198 if not self._is_background_coro: 2199 self.loop.foreground_coro_num -= 1 2200 else: 2201 if self._is_background_coro: 2202 self.loop.foreground_coro_num += 1 2203 2204 self._is_background_coro = value 2205 2206 def destroy(self): 2207 if self.interface is not None: 2208 if isinstance(self.interface, Interface): 2209 self.interface.destroy() 2210 2211 if not self.is_background_coro: 2212 self.loop.foreground_coro_num -= 1 2213 2214 self._is_background_coro = None 2215 self.init_args = None 2216 self.init_kwargs = None 2217 self.coro_id = None 2218 self.worker = None 2219 self.loop = None 2220 self.coro = None 2221 self.parent_coro = None 2222 self.interface = None 2223 self.last_result = None 2224 self.exception = None 2225 self.coro_on_del_handlers = None 2226 self._make_coro_method = None 2227 self._make_interface = None 2228 self._init_method = None 2229 self._call_method = None 2230 self._throw_method = None 2231 self._close_method = None 2232 self._current_call_method = None 2233 2234 2235class CoroWrapperGreenlet(CoroWrapperBase): 2236 def __init__(self, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs): 2237 super(CoroWrapperGreenlet, self).__init__(loop, coro_id, worker, *args, **kwargs) 2238 self._make_coro_method = self._make_coro_method_imp 2239 self._make_interface = self._make_interface_imp 2240 self._init_method = self._init_method_imp 2241 self._call_method = self._call_method_imp 2242 self._throw_method = self._throw_method_imp # type: Callable 2243 self._close_method = self._close_method_imp # type: Callable 2244 self._current_call_method = self._call_method # type: Callable 2245 2246 def _make_coro_method_imp(self): 2247 return greenlet(self.worker, self.parent_coro) 2248 2249 def _make_interface_imp(self): 2250 return InterfaceGreenlet(self.loop, self) 2251 2252 def _init_method_imp(self, init_args, init_kwargs): 2253 try: 2254 # if __debug__: dlog(f'λ => (init): {func_info(self.worker.worker)}') 2255 self.last_result = self.coro.switch(self.interface, *init_args, **init_kwargs) # TODO: wrong 2256 # if __debug__: dlog(f'λ <= (init): {repr(self.worker.worker)}') 2257 except GreenletExit: 2258 # if __debug__: dlog(f'λ <= (init; GreenletExit): {repr(self.worker.worker)}') 2259 self.last_result = self.interface.result 2260 except: 2261 # if __debug__: dlog(f'λ <= (init; Exception): {repr(self.worker.worker)}') 2262 self.last_result = None 2263 raise 2264 2265 def _call_method_imp(self, *args, **kwargs): 2266 try: 2267 # if __debug__: dlog(f'λ => (call): {func_info(self.worker.worker)}') 2268 self.last_result = self.coro.switch(*args, **kwargs) # TODO: wrong 2269 # if __debug__: dlog(f'λ <= (call): {repr(self.worker.worker)}') 2270 except GreenletExit: 2271 # if __debug__: dlog(f'λ <= (init; GreenletExit): {repr(self.worker.worker)}') 2272 self.last_result = self.interface.result 2273 except: 2274 # if __debug__: dlog(f'λ <= (init; Exception): {repr(self.worker.worker)}') 2275 self.last_result = None 2276 raise 2277 2278 # def _throw_method_imp(self, *args, **kwargs): 2279 def _throw_method_imp(self, ex_type, ex_value=None, ex_traceback=None): 2280 try: 2281 # if __debug__: dlog(f'λ => (throw): {func_info(self.worker.worker)}') 2282 self.last_result = self.coro.throw(ex_type, ex_value, ex_traceback) 2283 # if __debug__: dlog(f'λ <= (throw): {repr(self.worker.worker)}') 2284 except GreenletExit: 2285 # if __debug__: dlog(f'λ <= (init; GreenletExit): {repr(self.worker.worker)}') 2286 self.last_result = self.interface.result 2287 except: 2288 # if __debug__: dlog(f'λ <= (init; Exception): {repr(self.worker.worker)}') 2289 self.last_result = None 2290 raise 2291 2292 def _close_method_imp(self): 2293 try: 2294 # if __debug__: dlog(f'λ => (close): {func_info(self.worker.worker)}') 2295 self.last_result = self.coro.throw() 2296 # if __debug__: dlog(f'λ <= (close): {repr(self.worker.worker)}') 2297 except GreenletExit: 2298 # if __debug__: dlog(f'λ <= (init; GreenletExit): {repr(self.worker.worker)}') 2299 self.last_result = self.interface.result 2300 except: 2301 # if __debug__: dlog(f'λ <= (init; Exception): {repr(self.worker.worker)}') 2302 self.last_result = None 2303 raise 2304 2305 def __bool__(self) -> bool: 2306 return bool(self.coro) 2307 2308 def destroy(self): 2309 if isinstance(self.worker, GreenletWorkerWrapper): 2310 self.worker.destroy() 2311 2312 return super().destroy() 2313 2314 2315async def init_asyncgenerator(entity: Callable): 2316 try: 2317 result = await entity.asend(None) 2318 return (result, None) 2319 except: 2320 exception = get_exception() 2321 return (None, exception) 2322 2323 2324async def call_asyncgenerator(entity: Callable, *args, **kwargs): 2325 try: 2326 result = await entity.asend(*args, **kwargs) 2327 return (result, None) 2328 except: 2329 exception = get_exception() 2330 return (None, exception) 2331 2332 2333async def throw_asyncgenerator(entity: Callable, *args, **kwargs): 2334 try: 2335 result = await entity.athrow(*args, **kwargs) # await entity.athrow(type[, value[, traceback]]) ; https://docs.python.org/3/reference/expressions.html#agen.asend 2336 return (result, None) 2337 except: 2338 exception = get_exception() 2339 return (None, exception) 2340 2341 2342async def close_asyncgenerator(entity: Callable): 2343 try: 2344 result = await entity.aclose() 2345 return (result, None) 2346 except: 2347 exception = get_exception() 2348 return (None, exception) 2349 2350 2351async def init_asyncgeneratorfunction(entity: Callable, *args, **kwargs): 2352 try: 2353 entity = entity(*args, **kwargs) 2354 result = await entity.asend(None) 2355 return (entity, result, None) 2356 except: 2357 exception = get_exception() 2358 return (entity, None, exception) 2359 2360 2361async def awaitable_wrapper(entity: Awaitable): 2362 return await entity 2363 2364 2365async def callable_wrapper(entity: Callable, *args, **kwargs): 2366 return entity(*args, **kwargs) 2367 2368 2369class CoroWrapperAsyncAwait(CoroWrapperBase): 2370 def __init__(self, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs): 2371 super(CoroWrapperAsyncAwait, self).__init__(loop, coro_id, worker, *args, **kwargs) 2372 self._make_coro_method = self._make_coro_method_imp 2373 self._make_interface = self._make_interface_imp 2374 self.in_run_state = False 2375 self.subtype = self._setup_subtype() 2376 self._current_call_method = self._call_method # type: Callable 2377 2378 def _make_coro_method_imp(self): 2379 return self.worker 2380 2381 def _make_interface_imp(self): 2382 return InterfaceAsyncAwait(self.loop, self) 2383 2384 def __bool__(self) -> bool: 2385 return bool(self.in_run_state) 2386 2387 def _setup_subtype(self): 2388 if inspect.iscoroutine(self.worker): 2389 # if __debug__: dlog('λ >>>\tCOROUTINE') 2390 self._init_method = self._init_coroutine 2391 self._call_method = self._call_coroutine 2392 self._throw_method = self._throw_coroutine 2393 self._close_method = self._close_coroutine 2394 return 0 2395 elif inspect.isgenerator(self.worker): 2396 # if __debug__: dlog('λ >>>\tGENERATOR') 2397 self._init_method = self._init_generator 2398 self._call_method = self._call_coroutine 2399 self._throw_method = self._throw_coroutine 2400 self._close_method = self._close_coroutine 2401 return 1 2402 elif inspect.iscoroutinefunction(self.worker): 2403 # if __debug__: dlog('λ >>>\tCOROUTINE FUNCTION') 2404 self._init_method = self._init_coroutinefunction 2405 self._call_method = self._call_coroutine 2406 self._throw_method = self._throw_coroutine 2407 self._close_method = self._close_coroutine 2408 return 2 2409 elif inspect.isgeneratorfunction(self.worker): 2410 # if __debug__: dlog('λ >>>\tGENERATOR FUNCTION') 2411 self._init_method = self._init_generatorfunction 2412 self._call_method = self._call_coroutine 2413 self._throw_method = self._throw_coroutine 2414 self._close_method = self._close_coroutine 2415 return 3 2416 elif inspect.isasyncgen(self.worker): 2417 # if __debug__: dlog('λ >>>\tASYNC GENERATOR') 2418 self._init_method = self._init_asyncgenerator 2419 self._call_method = self._call_asyncgenerator 2420 self._throw_method = self._throw_asyncgenerator 2421 self._close_method = self._close_asyncgenerator 2422 return 4 2423 elif inspect.isasyncgenfunction(self.worker): 2424 # if __debug__: dlog('λ >>>\tASYNC GENERATOR FUNCTION') 2425 self._init_method = self._init_asyncgeneratorfunction 2426 self._call_method = self._call_asyncgenerator 2427 self._throw_method = self._throw_asyncgenerator 2428 self._close_method = self._close_asyncgenerator 2429 return 5 2430 elif inspect.isawaitable(self.worker): 2431 # if __debug__: dlog('λ >>>\tAWAITABLE') 2432 self._init_method = self._init_awaitable 2433 self._call_method = self._call_coroutine 2434 self._throw_method = self._throw_coroutine 2435 self._close_method = self._close_coroutine 2436 return 6 2437 elif callable(self.worker): 2438 # if __debug__: dlog('λ >>>\tCALLABLE') 2439 self._init_method = self._init_callable 2440 self._call_method = self._call_coroutine 2441 self._throw_method = self._throw_coroutine 2442 self._close_method = self._close_coroutine 2443 return 7 2444 else: 2445 raise TypeError(f'{self.worker} is neither an awaitable nor a wrapper for an awaitable') 2446 2447 def _init_coroutine(self, init_args, init_kwargs): 2448 try: 2449 self.in_run_state = True 2450 self.last_result = self.coro.send(None) 2451 except StopIteration as ex: 2452 self.in_run_state = False 2453 self.last_result = ex.value 2454 except: 2455 self.in_run_state = False 2456 raise 2457 2458 def _call_coroutine(self, *args, **kwargs): 2459 try: 2460 self.in_run_state = True 2461 self.last_result = self.coro.send(*args, **kwargs) 2462 except StopIteration as ex: 2463 self.in_run_state = False 2464 self.last_result = ex.value 2465 except: 2466 self.in_run_state = False 2467 raise 2468 2469 if sys.version_info >= (3, 12): 2470 # if (3, 12) <= PYTHON_VERSION_INT: 2471 def _throw_coroutine(self, ex_type, ex_value=None, ex_traceback=None): 2472 try: 2473 self.in_run_state = True 2474 if ex_value is None: 2475 ex_value = ex_type() 2476 2477 self.last_result = self.coro.throw(ex_value) # Changed in version 3.12: The second signature (type[, value[, traceback]]) is deprecated and may be removed in a future version of Python. 2478 except StopIteration as ex: 2479 self.in_run_state = False 2480 self.last_result = ex.value 2481 except: 2482 self.in_run_state = False 2483 raise 2484 else: 2485 def _throw_coroutine(self, ex_type, ex_value=None, ex_traceback=None): 2486 try: 2487 self.in_run_state = True 2488 self.last_result = self.coro.throw(ex_type, ex_value, ex_traceback) # Changed in version 3.12: The second signature (type[, value[, traceback]]) is deprecated and may be removed in a future version of Python. 2489 except StopIteration as ex: 2490 self.in_run_state = False 2491 self.last_result = ex.value 2492 except: 2493 self.in_run_state = False 2494 raise 2495 2496 def _close_coroutine(self, *args, **kwargs): 2497 try: 2498 self.in_run_state = True 2499 self.last_result = self.coro.close() 2500 self.in_run_state = False 2501 except GeneratorExit as ex: 2502 self.in_run_state = False 2503 except: 2504 self.in_run_state = False 2505 raise 2506 2507 def _init_generator(self, init_args, init_kwargs): 2508 try: 2509 self.in_run_state = True 2510 self.last_result = next(self.coro) # ToDo: investigate how to provide an initial parameters 2511 except StopIteration as ex: 2512 self.in_run_state = False 2513 self.last_result = ex.value 2514 except: 2515 self.in_run_state = False 2516 raise 2517 2518 def _init_coroutinefunction(self, init_args, init_kwargs): 2519 self.coro = self.coro(self.interface, *init_args, **init_kwargs) 2520 self._init_coroutine(None, None) 2521 2522 def _init_generatorfunction(self, init_args, init_kwargs): 2523 self.coro = self.coro(self.interface, *init_args, **init_kwargs) 2524 self._init_generator(None, None) 2525 2526 def _init_asyncgenerator(self, init_args, init_kwargs): 2527 try: 2528 self.in_run_state = True 2529 entity = init_asyncgenerator(self.coro) 2530 result = entity.send(None) 2531 except StopIteration as ex: 2532 self.in_run_state = False 2533 result = ex.value 2534 except: 2535 self.in_run_state = False 2536 raise 2537 2538 self.last_result, exception = result 2539 if exception is not None: 2540 self.in_run_state = False 2541 if not isinstance(exception, StopAsyncIteration): 2542 raise exception 2543 2544 def _call_asyncgenerator(self, *args, **kwargs): 2545 try: 2546 self.in_run_state = True 2547 entity = call_asyncgenerator(self.coro, *args, **kwargs) 2548 result = entity.send(None) 2549 except StopIteration as ex: 2550 self.in_run_state = False 2551 result = ex.value 2552 except: 2553 self.in_run_state = False 2554 raise 2555 2556 self.last_result, exception = result 2557 if exception is not None: 2558 self.in_run_state = False 2559 if not isinstance(exception, StopAsyncIteration): 2560 raise exception 2561 2562 def _throw_asyncgenerator(self, *args, **kwargs): 2563 try: 2564 self.in_run_state = True 2565 entity = throw_asyncgenerator(self.coro, *args, **kwargs) 2566 result = entity.send(None) 2567 except StopIteration as ex: 2568 self.in_run_state = False 2569 result = ex.value 2570 except: 2571 self.in_run_state = False 2572 raise 2573 2574 self.last_result, exception = result 2575 if exception is not None: 2576 self.in_run_state = False 2577 if not isinstance(exception, StopAsyncIteration): 2578 raise exception 2579 2580 def _close_asyncgenerator(self): 2581 try: 2582 self.in_run_state = True 2583 entity = close_asyncgenerator(self.coro) 2584 result = entity.send(None) 2585 except StopIteration as ex: # TODO: check maybe `GeneratorExit` will be raised 2586 self.in_run_state = False 2587 result = ex.value 2588 except: 2589 self.in_run_state = False 2590 raise 2591 2592 self.last_result, exception = result 2593 if exception is not None: 2594 self.in_run_state = False 2595 if not isinstance(exception, StopAsyncIteration): 2596 raise exception 2597 2598 def _init_asyncgeneratorfunction(self, init_args, init_kwargs): 2599 try: 2600 self.in_run_state = True 2601 entity = init_asyncgeneratorfunction(self.coro, self.interface, *init_args, **init_kwargs) 2602 result = entity.send(None) 2603 except StopIteration as ex: 2604 self.in_run_state = False 2605 result = ex.value 2606 except: 2607 self.in_run_state = False 2608 raise 2609 2610 self.coro, self.last_result, exception = result 2611 if exception is not None: 2612 self.in_run_state = False 2613 if not isinstance(exception, StopAsyncIteration): 2614 raise exception 2615 2616 def _init_awaitable(self, init_args, init_kwargs): 2617 self.coro = awaitable_wrapper(self.coro) 2618 self._init_coroutine() 2619 2620 def _init_callable(self, init_args, init_kwargs): 2621 self.coro = callable_wrapper(self.coro, self.interface, *init_args, **init_kwargs) 2622 self._init_coroutine() 2623 2624 def destroy(self): 2625 self._make_coro_method = None 2626 self._make_interface = None 2627 self.in_run_state = None 2628 self.subtype = None 2629 return super().destroy() 2630 2631 2632class GreenletWorkerWrapper: 2633 def __init__(self, worker: Worker): 2634 self.worker = worker 2635 2636 def __call__(self, interface: 'InterfaceGreenlet', *args, **kwargs): 2637 result = None 2638 try: 2639 result = self.worker(interface, *args, **kwargs) 2640 finally: 2641 interface.register_result(result) # in case if GreenletExit will be raised by greenlet framework 2642 return result 2643 2644 def destroy(self): 2645 self.worker = None 2646 2647 2648def find_coro_type(entity) -> CoroType: 2649 if isinstance(entity, EntityArgsHolder): 2650 entity, args, kwargs = entity.entity_args_kwargs() 2651 2652 if inspect.iscoroutine(entity) or inspect.isgenerator(entity) or inspect.iscoroutinefunction(entity) or inspect.isgeneratorfunction(entity) or inspect.isasyncgen(entity) or inspect.isasyncgenfunction(entity) or inspect.isawaitable(entity): 2653 return CoroType.awaitable 2654 elif callable(entity): 2655 return CoroType.greenlet 2656 else: 2657 raise TypeError(f'{entity} is neither an awaitable nor a greenlet') 2658 2659 2660def coro_wrapper_factory(coro_type: CoroType, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs) -> 'CoroWrapperBase': 2661 if CoroType.auto == coro_type: 2662 coro_type = find_coro_type(worker) 2663 2664 if CoroType.greenlet == coro_type: 2665 if isinstance(worker, EntityArgsHolder): 2666 worker, args, kwargs = worker.entity_args_kwargs() 2667 worker = GreenletWorkerWrapper(worker) 2668 worker: EntityArgsHolderExplicit = EntityArgsHolderExplicit(worker, args, kwargs) 2669 return CoroWrapperGreenlet(loop, coro_id, worker, *args, **kwargs) 2670 else: 2671 worker = GreenletWorkerWrapper(worker) 2672 return CoroWrapperGreenlet(loop, coro_id, worker, *args, **kwargs) 2673 elif CoroType.awaitable == coro_type: 2674 return CoroWrapperAsyncAwait(loop, coro_id, worker, *args, **kwargs) 2675 else: 2676 raise NotImplementedError 2677 2678 2679class Interface: 2680 def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase): 2681 self._loop: CoroSchedulerBase = loop # type: CoroSchedulerBase 2682 self._coro: CoroWrapperBase = coro # type: CoroWrapperBase 2683 self.coro_id: CoroID = coro.coro_id # type: CoroID 2684 self.in_work: bool = False 2685 self.ignored_by_watchdog: bool = False 2686 self.logger: logging.Logger = self._loop.logger 2687 self.log: logging.Logger = self.logger 2688 2689 @contextmanager 2690 def ignore_by_watchdog(self): 2691 current_state = self.ignored_by_watchdog 2692 self.ignored_by_watchdog = True 2693 try: 2694 yield 2695 finally: 2696 self.ignored_by_watchdog = current_state 2697 2698 def _normalize_call_args_kwargs(self, service_type: Union[ServiceType, 'ServiceRequest'], args, kwargs) -> Tuple[Type['Service'], Tuple, Dict]: 2699 if inspect.isclass(service_type): 2700 if issubclass(service_type, Service): 2701 return service_type, args, kwargs 2702 elif issubclass(service_type, ServiceRequest): 2703 # return service_type, args, kwargs 2704 return self._normalize_call_args_kwargs(service_type()(*args, **kwargs)) 2705 else: 2706 print(f'service_type of an unsupported type: {service_type}') 2707 raise TypeError(f'service_type of an unsupported type: {service_type}') 2708 elif isinstance(service_type, ServiceRequest): 2709 request: ServiceRequest = service_type 2710 service_type = request.default_service_type 2711 if service_type is None: 2712 print(f'Service request class {type(request)} have no default service assigned. Please provide service_type explicitly') 2713 raise RuntimeError(f'Service request class {type(request)} have no default service assigned. Please provide service_type explicitly') 2714 else: 2715 args, kwargs = args_kwargs(request, *args, **kwargs) 2716 return service_type, args, kwargs 2717 else: 2718 print(f'{service_type=}') 2719 raise ValueError(f'{service_type=}') 2720 2721 # def __call__(self, service_type: ServiceType, *args, **kwargs) -> Any: 2722 # """ 2723 # Should be called from inside coroutines only. 2724 # Will request some long running work to some service. 2725 2726 # :param coro_id: 2727 # :param service_type: 2728 # :param args: 2729 # :param kwargs: 2730 # """ 2731 2732 # response = self.__put_task_method(self.__coro, service_type, *args, *kwargs) 2733 # return response() 2734 2735 @overload 2736 def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2737 2738 @overload 2739 def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2740 2741 @overload 2742 def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2743 2744 @overload 2745 def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2746 2747 @overload 2748 def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2749 2750 @overload 2751 def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2752 2753 @overload 2754 def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ... 2755 2756 @overload 2757 def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2758 2759 def __call__(self, service_type, *args, **kwargs): 2760 raise NotImplementedError 2761 2762 # !!! Must not be present in interface since leads to incorrect greenlets switches 2763 # def put_coro(self, coro_type: CoroType, coro_worker: Worker, *args, **kwargs) -> CoroID: 2764 # self.in_work = True 2765 # coro = self._loop.put_coro(coro_type, coro_worker, *args, **kwargs) 2766 # self.in_work = False 2767 # return coro.coro_id 2768 2769 def destroy(self): 2770 self._loop = None 2771 self._coro = None 2772 self.coro_id = None 2773 self.in_work = None 2774 2775 2776class InterfaceGreenlet(Interface): 2777 def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase): 2778 super(InterfaceGreenlet, self).__init__(loop, coro) 2779 self.result = None 2780 2781 @overload 2782 def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2783 2784 @overload 2785 def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2786 2787 @overload 2788 def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2789 2790 @overload 2791 def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2792 2793 @overload 2794 def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2795 2796 @overload 2797 def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2798 2799 @overload 2800 def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ... 2801 2802 @overload 2803 def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2804 2805 def __call__(self, service_type, *args, **kwargs): 2806 """ 2807 Should be called from inside coroutines only. 2808 Will request some long running work to some service. 2809 2810 :param coro_id: 2811 :param service_type: 2812 :param args: 2813 :param kwargs: 2814 """ 2815 2816 service_type, args, kwargs = self._normalize_call_args_kwargs(service_type, args, kwargs) 2817 2818 self.in_work = True 2819 # if __debug__: dlog(f'λ <= (request): <{func_info(self._coro.worker.worker, False)}>: {service_type}, {args}, {kwargs}') 2820 try: 2821 loop: CoroSchedulerGreenlet = self._loop 2822 response: Response = loop.root_coro.switch(Request(self._coro, service_type, *args, **kwargs)) 2823 # TODO: we will switch to the next coro and do necessary preparations and reactions 2824 # if loop._coroutines_can_switch_directly: 2825 # if loop._is_current_coro_was_new_born: 2826 # ... 2827 # else: 2828 # ... 2829 # else: 2830 # response: Response = loop.root_coro.switch(Request(self._coro, service_type, *args, **kwargs)) 2831 except AttributeError: 2832 # if __debug__: dlog(f'x λ: {id(self._loop)}, {self._loop}, root_coro: {self._loop.root_coro}; root_coro_iteration: {self._loop.root_coro_iteration}; current_loop: {self._loop.current_loop()}; current_interface: {self._loop.current_interface()}') 2833 raise 2834 # if __debug__: dlog(f'λ => (response): <{func_info(self._coro.worker.worker)}>: {repr(response)}') 2835 self.in_work = False 2836 if isinstance(response, Response): 2837 return response() 2838 2839 dlog(f"ERROR:\n\tRESPONSE TYPE: {type(response)}\n\tRESPONSE REPR: {repr(response)}\n\tRESPONSE STR: {str(response)}") 2840 raise RuntimeError(f'Wrong type of response from the service: {type(response)}; {repr(response)}.') 2841 2842 def register_result(self, result: Any): 2843 self.result = result 2844 2845 def destroy(self): 2846 self.result = None 2847 return super().destroy() 2848 2849 2850class InterfaceAsyncAwait(Interface): 2851 def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase): 2852 super(InterfaceAsyncAwait, self).__init__(loop, coro) 2853 2854 @overload 2855 async def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2856 2857 @overload 2858 async def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2859 2860 @overload 2861 async def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2862 2863 @overload 2864 async def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2865 2866 @overload 2867 async def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2868 2869 @overload 2870 async def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2871 2872 @overload 2873 async def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ... 2874 2875 @overload 2876 async def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2877 2878 async def __call__(self, service_type, *args, **kwargs): 2879 """ 2880 Should be called from inside coroutines only. 2881 Will request some long running work to some service. 2882 2883 :param coro_id: 2884 :param service_type: 2885 :param args: 2886 :param kwargs: 2887 """ 2888 2889 service_type, args, kwargs = self._normalize_call_args_kwargs(service_type, args, kwargs) 2890 2891 self.in_work = True 2892 response = await yield_task_from_asyncawait(Request(self._coro, service_type, *args, **kwargs)) 2893 self.in_work = False 2894 if isinstance(response, Response): 2895 return response() 2896 2897 dlog(f"ERROR:\n\tRESPONSE TYPE: {type(response)}\n\tRESPONSE REPR: {repr(response)}\n\tRESPONSE STR: {str(response)}") 2898 raise RuntimeError('Wrong type of response from the service') 2899 2900 2901class InterfaceFake(Interface): 2902 def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase): 2903 self._loop: CoroSchedulerBase = None # type: CoroSchedulerBase 2904 self._coro: CoroWrapperBase = None # type: CoroWrapperBase 2905 self.coro_id: CoroID = None # type: CoroID 2906 self.in_work: bool = False 2907 2908 @overload 2909 def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2910 2911 @overload 2912 def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2913 2914 @overload 2915 def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2916 2917 @overload 2918 def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2919 2920 @overload 2921 def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2922 2923 @overload 2924 def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2925 2926 @overload 2927 def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ... 2928 2929 @overload 2930 def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2931 2932 def __call__(self, service_type, *args, **kwargs) -> Any: 2933 service_type, args, kwargs = self._normalize_call_args_kwargs(service_type, args, kwargs) 2934 return None 2935 2936 2937class InterfaceFakeAsyncAwait(Interface): 2938 def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase): 2939 self._loop: CoroSchedulerBase = None # type: CoroSchedulerBase 2940 self._coro: CoroWrapperBase = None # type: CoroWrapperBase 2941 self.coro_id: CoroID = None # type: CoroID 2942 self.in_work: bool = False 2943 2944 @overload 2945 async def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2946 2947 @overload 2948 async def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2949 2950 @overload 2951 async def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2952 2953 @overload 2954 async def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2955 2956 @overload 2957 async def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2958 2959 @overload 2960 async def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2961 2962 @overload 2963 async def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ... 2964 2965 @overload 2966 async def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2967 2968 async def __call__(self, service_type, *args, **kwargs): 2969 service_type, args, kwargs = self._normalize_call_args_kwargs(service_type, args, kwargs) 2970 return None 2971 2972 2973class CallerCoroInfo: 2974 def __init__(self, coro: CoroWrapperBase): 2975 self.coro: CoroWrapperBase = coro 2976 self.coro_id: CoroID = coro.coro_id 2977 self.coro_type: Type = type(coro) 2978 2979 2980class WrongServiceRequestError(Exception): 2981 pass 2982 2983 2984class ServiceRequest: 2985 default_service_type: Optional[Type['Service']] = None 2986 default__request__type__: int = 0 2987 2988 def __init__(self): 2989 self.request_type: int = None # type: Optional[int] 2990 self.args: Optional[Tuple] = None # type: Optional[Tuple] 2991 self.kwargs: Optional[Dict] = None # type: Optional[Dict] 2992 self.provide_to_request_handler: bool = False 2993 2994 def _save(self, __request__type__: int, *args, **kwargs) -> 'ServiceRequest': 2995 self.request_type = __request__type__ 2996 self.args = args 2997 self.kwargs = kwargs 2998 return self 2999 3000 def _copy(self) -> 'ServiceRequest': 3001 raise NotImplementedError 3002 3003 def _save_to_copy(self, __request__type__: int, *args, **kwargs) -> 'ServiceRequest': 3004 return self._copy()._save(__request__type__, *args, **kwargs) 3005 3006 def __call__(self, *args: Any, **kwds: Any) -> 'ServiceRequest': 3007 """should call self._save() with some default __request__type__. Required for the Interface(Type[ServiceRequest], *args, **kwargs) call. Must return self 3008 3009 Returns: 3010 ServiceRequest: Must return self 3011 """ 3012 return self._save(self.default__request__type__, *args, **kwds) 3013 3014 def interface(self) -> Any: 3015 current_interface()(self.default_service_type, self) 3016 3017 i = interface 3018 3019 async def async_interface(self) -> Any: 3020 await current_interface()(self.default_service_type, self) 3021 3022 ai = async_interface 3023 3024 def __repr__(self): 3025 return f'<{self.__class__.__name__}(request_type: {self.request_type}, args: {self.args}, kwargs: {self.kwargs})>' 3026 3027 3028class TypedServiceRequest(ServiceRequest, Generic[ServiceResponseTypeVar]): 3029 pass 3030 3031 3032class ServiceRequestMethodMixin: 3033 def __init__(self, service: 'Service') -> None: 3034 self.service = service 3035 3036 def __call__(self, *args: Any, **kwds: Any) -> Any: 3037 raise NotImplementedError 3038 3039 def full_processing_iteration(self): 3040 raise NotImplementedError 3041 3042 def in_work(self) -> bool: 3043 raise NotImplementedError 3044 3045 def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool: 3046 raise NotImplementedError 3047 3048 3049class DualImmediateProcessingServiceMixin: 3050 def single_task_registration_or_immediate_processing(self, *args, **kwargs) -> Tuple[bool, Any, None]: 3051 if (len(args) == 1) and (len(kwargs) == 0) and (isinstance(args[0], ServiceRequest)) or ((len(args) == 0) and (len(kwargs) == 1) and ('request' in kwargs) and isinstance(kwargs['request'], ServiceRequest)): 3052 return self.single_task_registration_or_immediate_processing_multiple(*args, **kwargs) 3053 else: 3054 return self.single_task_registration_or_immediate_processing_single(*args, **kwargs) 3055 3056 def single_task_registration_or_immediate_processing_multiple(self, request: ServiceRequest 3057 ) -> Tuple[bool, Any, None]: 3058 return self.resolve_request(request) 3059 3060 def single_task_registration_or_immediate_processing_single( 3061 self, *args, **kwargs 3062 ) -> Tuple[bool, Optional[CoroID], Any]: 3063 raise NotImplementedError 3064 3065 3066ServiceProcessingResultExists = bool 3067ServiceProcessingResult = Any 3068ServiceProcessingException = Optional[BaseException] 3069ServiceProcessingResponse = Tuple[ServiceProcessingResultExists, ServiceProcessingResult, ServiceProcessingException] 3070 3071 3072class Service(Iterable): 3073 def __init__(self, loop: CoroSchedulerBase): 3074 super(Service, self).__init__() 3075 self._loop: CoroSchedulerBase = loop # type: CoroSchedulerBase 3076 # self._requests = list() # type: List[Request] 3077 self._responses: List[Response] = list() # type: List[Response] 3078 self.current_caller_coro_info: Optional[CallerCoroInfo] = None # type: Optional[CallerCoroInfo] 3079 self._request_workers = dict() # type: Dict[int, Callable] 3080 3081 def iteration(self) -> Optional[List[Response]]: 3082 # requests = self._requests 3083 # self._requests = type(self._requests)() 3084 self._responses = list() 3085 # for request in requests: 3086 # self.current_caller_coro_info = CallerCoroInfo(request.coro_id) 3087 # result_exists, result = \ 3088 # self.single_task_registration_or_immediate_processing(*request.args, **request.kwargs) 3089 # if result_exists: 3090 # self.register_response(request.coro_id, result) 3091 # self.current_caller_coro_info = None 3092 try: 3093 self.full_processing_iteration() 3094 except: 3095 # if __debug__: dlog(sys.exc_info()) 3096 raise 3097 return self._responses 3098 3099 def make_response(self, coro_id: CoroID, response: Any, exception: Optional[BaseException]=None): 3100 return Response(coro_id, type(self), response, exception) 3101 3102 def register_response(self, coro_id: CoroID, response: Any, exception: Optional[BaseException]=None): 3103 self._responses.append(self.make_response(coro_id, response, exception)) 3104 3105 def put_task(self, request: Request) -> Optional[Response]: 3106 self.current_caller_coro_info = CallerCoroInfo(request.coro) 3107 result_exists, result, exception = \ 3108 self.single_task_registration_or_immediate_processing(*request.args, **request.kwargs) 3109 self.current_caller_coro_info = None 3110 if result_exists or exception: 3111 return self.make_response(request.coro_id, result, exception) 3112 return None 3113 3114 def resolve_request(self, request: ServiceRequest): 3115 try: 3116 if request.provide_to_request_handler: 3117 return self._request_workers[request.request_type](request, *request.args, **request.kwargs) 3118 else: 3119 return self._request_workers[request.request_type](*request.args, **request.kwargs) 3120 except: 3121 return True, None, get_exception() 3122 3123 def try_resolve_request(self, *args, **kwargs): 3124 possible_request: Optional[ServiceRequest] = None 3125 if (len(args) == 1) and (len(kwargs) == 0): 3126 possible_request = args[0] 3127 elif (len(kwargs) == 1) and (len(args) == 0): 3128 possible_request = kwargs.pop('request', None) 3129 3130 if possible_request is not None: 3131 if isinstance(possible_request, ServiceRequest): 3132 return self.resolve_request(possible_request) 3133 3134 return None 3135 3136 def single_task_registration_or_immediate_processing(self, *args, **kwargs) -> ServiceProcessingResponse: 3137 raise NotImplementedError 3138 3139 def full_processing_iteration(self): 3140 raise NotImplementedError 3141 3142 def in_work(self) -> bool: 3143 """Will be executed twice per iteration: once before and once after the full_processing_iteration() execution 3144 3145 Raises: 3146 NotImplementedError: _description_ 3147 3148 Returns: 3149 bool: _description_ 3150 """ 3151 raise NotImplementedError 3152 3153 def in_forground_work(self) -> bool: 3154 return True 3155 3156 def thrifty_in_work(self, result: bool) -> bool: 3157 if result: 3158 return True 3159 else: 3160 self.make_dead() 3161 return False 3162 3163 def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]: 3164 return False, None 3165 3166 def is_low_latency(self) -> bool: 3167 return False 3168 3169 def make_live(self): 3170 self._loop.make_service_live_fast(type(self), self.is_low_latency()) 3171 3172 def make_dead(self): 3173 self._loop.make_service_dead_fast(type(self)) 3174 3175 @staticmethod 3176 def service_id_impl(): 3177 return None 3178 3179 @staticmethod 3180 def service_id(service_type: Type): 3181 service_id = service_type.service_id_impl() 3182 if service_id is None: 3183 service_id = service_type.__name__ 3184 3185 return service_id 3186 3187 def destroy(self): 3188 pass 3189 3190 3191class TypedService(Service, Generic[ServiceResponseTypeVar]): 3192 pass
145def cs_coro(coro_worker: Callable[CoroParams, CoroResult]) -> Callable[Concatenate['Interface', CoroParams], CoroResult]: 146 """Decorator. Without arguments. Makes a greenlet Cengal coroutine from a pain function (which don't have an interface parameter) 147 148 Args: 149 coro_worker (Callable): _description_ 150 """ 151 if is_async(coro_worker): 152 async def wrapper(i: Interface, *args, **kwargs): 153 return await coro_worker(*args, **kwargs) 154 155 coro_worker_sign: inspect.Signature = inspect.signature(coro_worker) 156 update_wrapper(wrapper, coro_worker) 157 wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation) 158 # wrapper.__name__ = coro_worker.__name__ 159 wrapper: coro_worker 160 return wrapper 161 else: 162 def wrapper(i: Interface, *args, **kwargs): 163 return coro_worker(*args, **kwargs) 164 165 coro_worker_sign: inspect.Signature = inspect.signature(coro_worker) 166 update_wrapper(wrapper, coro_worker) 167 wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation) 168 # wrapper.__name__ = coro_worker.__name__ 169 wrapper: coro_worker 170 return wrapper
Decorator. Without arguments. Makes a greenlet Cengal coroutine from a pain function (which don't have an interface parameter)
Args: coro_worker (Callable): _description_
173def cs_acoro(coro_aworker: Callable[CoroParams, Awaitable[CoroResult]]) -> Callable[Concatenate['Interface', CoroParams], Awaitable[CoroResult]]: 174 """Decorator. Without arguments. Makes an async Cengal coroutine from an async function (which don't have an interface parameter) 175 176 Args: 177 coro_aworker (Callable): _description_ 178 """ 179 if is_async(coro_aworker): 180 async def wrapper(i: Interface, *args, **kwargs): 181 return await coro_aworker(*args, **kwargs) 182 183 coro_worker_sign: inspect.Signature = inspect.signature(coro_aworker) 184 update_wrapper(wrapper, coro_aworker) 185 wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation) 186 # wrapper.__name__ = coro_aworker.__name__ 187 wrapper: coro_aworker 188 return wrapper 189 else: 190 def wrapper(i: Interface, *args, **kwargs): 191 return coro_aworker(*args, **kwargs) 192 193 coro_worker_sign: inspect.Signature = inspect.signature(coro_aworker) 194 update_wrapper(wrapper, coro_aworker) 195 wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation) 196 wrapper.__name__ = coro_aworker.__name__ 197 wrapper: coro_aworker 198 return wrapper
Decorator. Without arguments. Makes an async Cengal coroutine from an async function (which don't have an interface parameter)
Args: coro_aworker (Callable): _description_
201def cs_callable(coro_worker: Callable[Concatenate['Interface', CoroParams], CoroResult]) -> Callable[CoroParams, CoroResult]: 202 """Decorator. Without arguments. Makes a callable sync Cengal coroutine (which don't have an interface parameter) from a sync Cengal coroutine 203 204 Args: 205 coro_worker (Callable): _description_ 206 """ 207 if is_async(coro_worker): 208 async def wrapper(*args, **kwargs): 209 i: Interface = current_interface() 210 return await coro_worker(i, *args, **kwargs) 211 212 coro_worker_sign: inspect.Signature = inspect.signature(coro_worker) 213 update_wrapper(wrapper, coro_worker) 214 # wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation) 215 wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation) 216 # wrapper.__name__ = coro_worker.__name__ 217 wrapper: coro_worker 218 return wrapper 219 else: 220 def wrapper(*args, **kwargs): 221 i: Interface = current_interface() 222 return coro_worker(i, *args, **kwargs) 223 224 coro_worker_sign: inspect.Signature = inspect.signature(coro_worker) 225 update_wrapper(wrapper, coro_worker) 226 # wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation) 227 wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation) 228 # wrapper.__name__ = coro_worker.__name__ 229 wrapper: coro_worker 230 return wrapper
Decorator. Without arguments. Makes a callable sync Cengal coroutine (which don't have an interface parameter) from a sync Cengal coroutine
Args: coro_worker (Callable): _description_
233def cs_acallable(coro_aworker: Callable[Concatenate['Interface', CoroParams], Awaitable[CoroResult]]) -> Callable[CoroParams, Awaitable[CoroResult]]: 234 """Decorator. Without arguments. Makes a callable async Cengal coroutine (which don't have an interface parameter) from a async Cengal coroutine 235 236 Args: 237 coro_aworker (Callable): _description_ 238 """ 239 if is_async(coro_aworker): 240 async def wrapper(*args, **kwargs): 241 i: Interface = current_interface() 242 return await coro_aworker(i, *args, **kwargs) 243 244 coro_worker_sign: inspect.Signature = inspect.signature(coro_aworker) 245 update_wrapper(wrapper, coro_aworker) 246 # wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation) 247 wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation) 248 # wrapper.__name__ = coro_aworker.__name__ 249 wrapper: coro_aworker 250 return wrapper 251 else: 252 def wrapper(*args, **kwargs): 253 i: Interface = current_interface() 254 return coro_aworker(i, *args, **kwargs) 255 256 coro_worker_sign: inspect.Signature = inspect.signature(coro_aworker) 257 update_wrapper(wrapper, coro_aworker) 258 # wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation) 259 wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation) 260 # wrapper.__name__ = coro_aworker.__name__ 261 wrapper: coro_aworker 262 return wrapper
Decorator. Without arguments. Makes a callable async Cengal coroutine (which don't have an interface parameter) from a async Cengal coroutine
Args: coro_aworker (Callable): _description_
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
457def loop_with_backup_loop(backup_scheduler: Optional['CoroSchedulerType'] = None) -> 'CoroSchedulerType': 458 if backup_scheduler is not None: 459 if not isinstance(backup_scheduler, CoroScheduler): 460 raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}') 461 462 loop = CoroScheduler.current_loop() 463 if loop is None: 464 # Outside the loop 465 loop = get_available_coro_scheduler() 466 if loop is None: 467 loop = backup_scheduler 468 469 if loop is None: 470 raise CoroSchedulerContextIsNotAvailable 471 472 return loop
475def get_loop_with_backup_loop(backup_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['CoroSchedulerType']: 476 if backup_scheduler is not None: 477 if not isinstance(backup_scheduler, CoroScheduler): 478 raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}') 479 480 loop = CoroScheduler.current_loop() 481 if loop is None: 482 # Outside the loop 483 loop = get_available_coro_scheduler() 484 if loop is None: 485 loop = backup_scheduler 486 487 return loop
490def loop_with_explicit_loop(explicit_scheduler: Optional['CoroSchedulerType'] = None) -> 'CoroSchedulerType': 491 loop = explicit_scheduler 492 current_loop = CoroScheduler.current_loop() 493 if loop is None: 494 loop = current_loop 495 if loop is None: 496 # Outside the loop 497 loop = get_available_coro_scheduler() 498 else: 499 if not isinstance(loop, CoroScheduler): 500 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 501 502 if loop is None: 503 raise CoroSchedulerContextIsNotAvailable 504 505 return loop
508def get_loop_with_explicit_loop(explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['CoroSchedulerType']: 509 loop = explicit_scheduler 510 current_loop = CoroScheduler.current_loop() 511 if loop is None: 512 loop = current_loop 513 if loop is None: 514 # Outside the loop 515 loop = get_available_coro_scheduler() 516 else: 517 if not isinstance(loop, CoroScheduler): 518 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 519 520 return loop
524def interface_and_loop_with_backup_loop(backup_scheduler: Optional['CoroSchedulerType'] = None) -> Tuple['CoroSchedulerType', 'Interface', bool]: 525 if backup_scheduler is not None: 526 if not isinstance(backup_scheduler, CoroScheduler): 527 raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}') 528 529 loop = CoroScheduler.current_loop() 530 if loop is None: 531 # Outside the loop 532 interface = None 533 loop = get_available_coro_scheduler() 534 if loop is None: 535 loop = backup_scheduler 536 else: 537 # In the loop (in coroutine or in the service) 538 interface = loop.current_interface() 539 540 if loop is None: 541 raise CoroSchedulerContextIsNotAvailable 542 543 if interface is None: 544 raise InterfaceIsNotAvailableError 545 546 coro_alive: bool = False 547 if interface is not None: 548 if interface._coro: 549 coro_alive = True 550 551 if not coro_alive: 552 raise CurrentCoroIsNotAliveError 553 554 return loop, interface, coro_alive
557def get_interface_and_loop_with_backup_loop(backup_scheduler: Optional['CoroSchedulerType'] = None) -> Tuple[Optional['CoroSchedulerType'], Optional['Interface'], bool]: 558 if backup_scheduler is not None: 559 if not isinstance(backup_scheduler, CoroScheduler): 560 raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}') 561 562 loop = CoroScheduler.current_loop() 563 if loop is None: 564 # Outside the loop 565 interface = None 566 loop = get_available_coro_scheduler() 567 if loop is None: 568 loop = backup_scheduler 569 else: 570 # In the loop (in coroutine or in the service) 571 interface = loop.current_interface() 572 573 coro_alive: bool = False 574 if interface is not None: 575 if interface._coro: 576 coro_alive = True 577 578 return loop, interface, coro_alive
581def interface_and_loop_with_explicit_loop(explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Tuple['CoroSchedulerType', 'Interface', bool]: 582 loop = explicit_scheduler 583 current_loop = CoroScheduler.current_loop() 584 interface = None 585 if loop is None: 586 loop = current_loop 587 if loop is None: 588 # Outside the loop 589 loop = get_available_coro_scheduler() 590 else: 591 # In the loop (in coroutine or in the service) 592 interface = loop.current_interface() 593 else: 594 if isinstance(loop, CoroScheduler): 595 if loop is current_loop: 596 interface = loop.current_interface() 597 else: 598 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 599 600 if loop is None: 601 raise CoroSchedulerContextIsNotAvailable 602 603 if interface is None: 604 raise InterfaceIsNotAvailableError 605 606 coro_alive: bool = False 607 if interface is not None: 608 if interface._coro: 609 coro_alive = True 610 611 if not coro_alive: 612 raise CurrentCoroIsNotAliveError 613 614 return loop, interface, coro_alive
617def get_interface_and_loop_with_explicit_loop(explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Tuple[Optional['CoroSchedulerType'], Optional['Interface'], bool]: 618 loop = explicit_scheduler 619 current_loop = CoroScheduler.current_loop() 620 interface = None 621 if loop is None: 622 loop = current_loop 623 if loop is None: 624 # Outside the loop 625 loop = get_available_coro_scheduler() 626 else: 627 # In the loop (in coroutine or in the service) 628 interface = loop.current_interface() 629 else: 630 if isinstance(loop, CoroScheduler): 631 if loop is current_loop: 632 interface = loop.current_interface() 633 else: 634 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 635 636 coro_alive: bool = False 637 if interface is not None: 638 if interface._coro: 639 coro_alive = True 640 641 return loop, interface, coro_alive
644def interface_for_an_explicit_loop(explicit_scheduler: 'CoroSchedulerType') -> Tuple['CoroSchedulerType', 'Interface', bool]: 645 loop = explicit_scheduler 646 interface = None 647 if loop is None: 648 raise CoroSchedulerContextIsNotAvailable 649 else: 650 if isinstance(loop, CoroScheduler): 651 if loop is CoroScheduler.current_loop(): 652 interface = loop.current_interface() 653 else: 654 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 655 656 if interface is None: 657 raise InterfaceIsNotAvailableError 658 659 coro_alive: bool = False 660 if interface is not None: 661 if interface._coro: 662 coro_alive = True 663 664 if not coro_alive: 665 raise CurrentCoroIsNotAliveError 666 667 return loop, interface, coro_alive
670def get_interface_for_an_explicit_loop(explicit_scheduler: 'CoroSchedulerType') -> Tuple[Optional['CoroSchedulerType'], Optional['Interface'], bool]: 671 loop = explicit_scheduler 672 interface = None 673 if loop is not None: 674 if isinstance(loop, CoroScheduler): 675 if loop is CoroScheduler.current_loop(): 676 interface = loop.current_interface() 677 else: 678 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 679 680 coro_alive: bool = False 681 if interface is not None: 682 if interface._coro: 683 coro_alive = True 684 685 return loop, interface, coro_alive
690def service_with_backup_loop(service_type: Type['Service'], backup_scheduler: Optional['CoroSchedulerType'] = None) -> 'Service': 691 if backup_scheduler is not None: 692 if not isinstance(backup_scheduler, CoroScheduler): 693 raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}') 694 695 loop = CoroScheduler.current_loop() 696 if loop is None: 697 # Outside the loop 698 loop = get_available_coro_scheduler() 699 if loop is None: 700 loop = backup_scheduler 701 702 if loop is None: 703 raise CoroSchedulerContextIsNotAvailable 704 705 return loop.get_service_instance(service_type)
708def get_service_with_backup_loop(service_type: Type['Service'], backup_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['Service']: 709 if backup_scheduler is not None: 710 if not isinstance(backup_scheduler, CoroScheduler): 711 raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}') 712 713 loop = CoroScheduler.current_loop() 714 if loop is None: 715 # Outside the loop 716 loop = get_available_coro_scheduler() 717 if loop is None: 718 loop = backup_scheduler 719 720 if loop is None: 721 return None 722 else: 723 return loop.get_service_instance(service_type)
726def service_with_explicit_loop(service_type: Type['Service'], explicit_scheduler: Optional['CoroSchedulerType'] = None) -> 'Service': 727 loop = explicit_scheduler 728 current_loop = CoroScheduler.current_loop() 729 if loop is None: 730 loop = current_loop 731 if loop is None: 732 # Outside the loop 733 loop = get_available_coro_scheduler() 734 else: 735 if not isinstance(loop, CoroScheduler): 736 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 737 738 if loop is None: 739 raise CoroSchedulerContextIsNotAvailable 740 741 return loop.get_service_instance(service_type)
744def get_service_with_explicit_loop(service_type: Type['Service'], explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['Service']: 745 loop = explicit_scheduler 746 current_loop = CoroScheduler.current_loop() 747 if loop is None: 748 loop = current_loop 749 if loop is None: 750 # Outside the loop 751 loop = get_available_coro_scheduler() 752 else: 753 if not isinstance(loop, CoroScheduler): 754 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 755 756 if loop is None: 757 return None 758 else: 759 return loop.get_service_instance(service_type)
763def service_fast_with_backup_loop(service_type: Type['Service'], backup_scheduler: Optional['CoroSchedulerType'] = None) -> 'Service': 764 if backup_scheduler is not None: 765 if not isinstance(backup_scheduler, CoroScheduler): 766 raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}') 767 768 loop = CoroScheduler.current_loop() 769 if loop is None: 770 # Outside the loop 771 loop = get_available_coro_scheduler() 772 if loop is None: 773 loop = backup_scheduler 774 775 if loop is None: 776 raise CoroSchedulerContextIsNotAvailable 777 778 return loop.get_service_instance_fast(service_type)
781def get_service_fast_with_backup_loop(service_type: Type['Service'], backup_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['Service']: 782 if backup_scheduler is not None: 783 if not isinstance(backup_scheduler, CoroScheduler): 784 raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}') 785 786 loop = CoroScheduler.current_loop() 787 if loop is None: 788 # Outside the loop 789 loop = get_available_coro_scheduler() 790 if loop is None: 791 loop = backup_scheduler 792 793 if loop is None: 794 return None 795 else: 796 return loop.get_service_instance_fast(service_type)
799def service_fast_with_explicit_loop(service_type: Type['Service'], explicit_scheduler: Optional['CoroSchedulerType'] = None) -> 'Service': 800 loop = explicit_scheduler 801 current_loop = CoroScheduler.current_loop() 802 if loop is None: 803 loop = current_loop 804 if loop is None: 805 # Outside the loop 806 loop = get_available_coro_scheduler() 807 else: 808 if not isinstance(loop, CoroScheduler): 809 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 810 811 if loop is None: 812 raise CoroSchedulerContextIsNotAvailable 813 814 return loop.get_service_instance_fast(service_type)
817def get_service_fast_with_explicit_loop(service_type: Type['Service'], explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['Service']: 818 loop = explicit_scheduler 819 current_loop = CoroScheduler.current_loop() 820 if loop is None: 821 loop = current_loop 822 if loop is None: 823 # Outside the loop 824 loop = get_available_coro_scheduler() 825 else: 826 if not isinstance(loop, CoroScheduler): 827 raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}') 828 829 if loop is None: 830 return None 831 else: 832 return loop.get_service_instance_fast(service_type)
An enumeration.
Inherited Members
- enum.Enum
- name
- value
842class ExplicitWorker: 843 def __init__(self, coro_type: CoroType, worker: Worker) -> None: 844 self.coro_type: CoroType = coro_type 845 self.worker: Worker = worker
1898class CoroSchedulerGreenlet(CoroSchedulerBase): 1899 def __init__(self, logger: Optional[logging.Logger] = None): 1900 super().__init__(logger) 1901 self.root_coro_loop = greenlet(self._loop_imp) # type: Coro 1902 # self.root_coro_iteration = greenlet(self._iteration_imp) # type: Coro 1903 self.root_coro_iteration = greenlet(self._iter_wrapper) # type: Coro 1904 # self._root_coro = None # type: Optional[Coro] 1905 self.root_coro = None # type: Optional[Coro] 1906 self._coroutines_can_switch_directly = True 1907 1908 def _new_coro_type_normalizer(self, coro_type: Optional[CoroType]) -> CoroType: 1909 return coro_type or CoroType.auto 1910 1911 def loop(self): 1912 if self.need_to_log_loop_start_and_end: 1913 self.logger.info(f'{datetime.now()} >> {type(self).__name__} loop started...') 1914 1915 self.root_coro = self.root_coro_loop 1916 1917 # if __debug__: dlog('Switch to root_coro') 1918 self.root_coro.switch() 1919 # if __debug__: dlog('Switch from root_coro') 1920 self.root_coro = None 1921 self.root_coro_loop = greenlet(self._loop_imp) 1922 1923 def iteration(self): 1924 # global _debug_log_counter 1925 # last_debug_log_counter = _debug_log_counter 1926 # if __debug__: dlog(f'^ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}') 1927 1928 self.root_coro = self.root_coro_iteration 1929 in_work = False 1930 try: 1931 # if __debug__: dlog('Switch to root_coro (iteration)') 1932 in_work = self.root_coro.switch(True) 1933 # if __debug__: dlog('Switch from root_coro (iteration)') 1934 except GreenletExit: 1935 self.stop_iteration() 1936 # if __debug__: dlog('Switch from root_coro (GreenletExit)') 1937 except: 1938 self.stop_iteration() 1939 # if __debug__: dlog('Switch from root_coro (Exception):') 1940 # if __debug__: dlog(sys.exc_info()) 1941 raise 1942 finally: 1943 self.root_coro = None 1944 # if __debug__: 1945 # if last_debug_log_counter < _debug_log_counter: 1946 # if __debug__: dlog(f'_ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}') 1947 1948 return in_work 1949 1950 def stop_iteration(self): 1951 try: 1952 in_work = self.root_coro.switch(False) 1953 except GreenletExit: 1954 self.stop_iteration() 1955 # if __debug__: dlog('Stop root_coro (GreenletExit)') 1956 except: 1957 self.stop_iteration() 1958 # if __debug__: dlog('Stop root_coro (Exception):') 1959 # if __debug__: dlog(sys.exc_info()) 1960 raise 1961 finally: 1962 self.root_coro = None 1963 self.root_coro_iteration = greenlet(self._iter_wrapper) 1964 1965 def _iter_wrapper(self, proceed: bool=True): 1966 try: 1967 while proceed: 1968 self._iteration_imp() 1969 proceed = greenlet.getcurrent().parent.switch(self.in_work()) 1970 except CoroSchedulerDestroyRequestedException: 1971 pass 1972 finally: 1973 self.destroy()
1899 def __init__(self, logger: Optional[logging.Logger] = None): 1900 super().__init__(logger) 1901 self.root_coro_loop = greenlet(self._loop_imp) # type: Coro 1902 # self.root_coro_iteration = greenlet(self._iteration_imp) # type: Coro 1903 self.root_coro_iteration = greenlet(self._iter_wrapper) # type: Coro 1904 # self._root_coro = None # type: Optional[Coro] 1905 self.root_coro = None # type: Optional[Coro] 1906 self._coroutines_can_switch_directly = True
1911 def loop(self): 1912 if self.need_to_log_loop_start_and_end: 1913 self.logger.info(f'{datetime.now()} >> {type(self).__name__} loop started...') 1914 1915 self.root_coro = self.root_coro_loop 1916 1917 # if __debug__: dlog('Switch to root_coro') 1918 self.root_coro.switch() 1919 # if __debug__: dlog('Switch from root_coro') 1920 self.root_coro = None 1921 self.root_coro_loop = greenlet(self._loop_imp)
1923 def iteration(self): 1924 # global _debug_log_counter 1925 # last_debug_log_counter = _debug_log_counter 1926 # if __debug__: dlog(f'^ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}') 1927 1928 self.root_coro = self.root_coro_iteration 1929 in_work = False 1930 try: 1931 # if __debug__: dlog('Switch to root_coro (iteration)') 1932 in_work = self.root_coro.switch(True) 1933 # if __debug__: dlog('Switch from root_coro (iteration)') 1934 except GreenletExit: 1935 self.stop_iteration() 1936 # if __debug__: dlog('Switch from root_coro (GreenletExit)') 1937 except: 1938 self.stop_iteration() 1939 # if __debug__: dlog('Switch from root_coro (Exception):') 1940 # if __debug__: dlog(sys.exc_info()) 1941 raise 1942 finally: 1943 self.root_coro = None 1944 # if __debug__: 1945 # if last_debug_log_counter < _debug_log_counter: 1946 # if __debug__: dlog(f'_ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}') 1947 1948 return in_work
should return False if ready to stop looping :return:
1950 def stop_iteration(self): 1951 try: 1952 in_work = self.root_coro.switch(False) 1953 except GreenletExit: 1954 self.stop_iteration() 1955 # if __debug__: dlog('Stop root_coro (GreenletExit)') 1956 except: 1957 self.stop_iteration() 1958 # if __debug__: dlog('Stop root_coro (Exception):') 1959 # if __debug__: dlog(sys.exc_info()) 1960 raise 1961 finally: 1962 self.root_coro = None 1963 self.root_coro_iteration = greenlet(self._iter_wrapper)
Inherited Members
- CoroSchedulerBase
- in_iteration
- services
- live_services
- live_low_latency_services
- requests
- responses
- new_born_coroutines
- coroutines
- coro_counter
- services_in_work
- services_in_foreground_work
- services_in_active_state
- services_in_active_state_list
- time_left_before_next_event
- coro_on_del_handlers
- current_coro_interface
- current_coro_wrapper
- suppress_coro_exceptions
- iteration_index
- context_switches
- current_coro_start_time
- coro_execution_time
- coro_longest_execution_time
- loop_iteration_start_time
- need_to_measure_loop_iteration_time
- need_to_measure_coro_time
- need_to_gather_coro_history
- permitted_use_put_coro_from_coro
- get_coro_start_time
- get_loop_iteration_start_time
- coro_history_gatherer
- on_woke_up_callback
- coro_on_start_handlers
- global_on_start_handlers_turned_on
- execute_global_on_start_handlers
- coro_workers_history
- coro_full_history
- run_low_latency_services
- services_impostrors
- keep_coro_execution_time_between_iterations
- keep_coro_longest_execution_time_between_iterations
- keep_coro_workers_history_between_iterations
- keep_coro_full_history_between_iterations
- loop_iteration_delta_time
- suppress_warnings_about_responses_to_not_existant_coroutines
- use_internal_sleep
- high_cpu_utilisation_mode
- foreground_coro_num
- on_idle_handlers
- idle_managers_num
- on_wrong_request
- get_service_by_type
- get_service_instance_fast
- get_service_instance
- make_service_live_fast
- make_service_live
- make_service_dead_fast
- make_service_dead
- sliding_window
- need_to_log_loop_start_and_end
- on_destroyed_handlers
- get_entity_stats
- log_exc
- get_service_instance_fast_impl
- get_service_instance_impl
- make_service_live_fast_impl
- make_service_live_impl
- make_service_dead_fast_impl
- make_service_dead_impl
- get_service_instance_fast_impostor_impl
- get_service_instance_impostor_impl
- make_service_live_fast_impostor_impl
- make_service_live_impostor_impl
- make_service_dead_fast_impostor_impl
- make_service_dead_impostor_impl
- set_coro_time_measurement
- set_coro_history_gathering
- set_loop_iteration_time_measurement
- put_coro_fast
- put_coro
- register_service
- is_service_registered
- unregister_service
- destroy_services
- get_service_by_type_impl
- get_service_by_type_impostor_impl
- register_service_impostor
- find_new_born_coroutine
- get_coro
- purify_worker
- set_global_on_start_handlers
- add_on_global_on_start_handler
- process_coro_exit_status
- kill_coro
- throw_coro
- forget_coro_by_id
- find_coro_by_id
- del_coro_by_id
- kill_coro_by_id
- throw_coro_by_id
- request_coro_throw_by_id
- request_coro_close_by_id
- turn_on_embedded_mode
- add_global_on_coro_del_handler
- restore_global_state
- current_loop
- current_interface
- current_wrapper
- destroy_all_coroutines
- destroy_new_born_coroutines
- destroy_coroutines
- destroy
- in_work
- is_awake
- is_idle
- next_event_after
1976class CoroSchedulerAwaitable(CoroSchedulerBase): 1977 def __init__(self, logger: Optional[logging.Logger] = None): 1978 super().__init__(logger) 1979 self.root_coro = None # type: Optional[Coro] 1980 1981 def _new_coro_type_normalizer(self, coro_type: Optional[CoroType]) -> CoroType: 1982 return CoroType.awaitable 1983 1984 def loop(self): 1985 if self.need_to_log_loop_start_and_end: 1986 self.logger.info(f'{datetime.now()} >> {type(self).__name__} loop started...') 1987 1988 self._loop_imp() 1989 1990 def iteration(self): 1991 # global _debug_log_counter 1992 # last_debug_log_counter = _debug_log_counter 1993 # if __debug__: dlog(f'^ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}') 1994 1995 need_to_destroy = False 1996 try: 1997 self._iteration_imp() 1998 return self.in_work() 1999 except CoroSchedulerDestroyRequestedException: 2000 need_to_destroy = True 2001 except: 2002 need_to_destroy = True 2003 raise 2004 finally: 2005 # if __debug__: 2006 # if last_debug_log_counter < _debug_log_counter: 2007 # if __debug__: dlog(f'_ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}') 2008 2009 if need_to_destroy: 2010 self.destroy()
1990 def iteration(self): 1991 # global _debug_log_counter 1992 # last_debug_log_counter = _debug_log_counter 1993 # if __debug__: dlog(f'^ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}') 1994 1995 need_to_destroy = False 1996 try: 1997 self._iteration_imp() 1998 return self.in_work() 1999 except CoroSchedulerDestroyRequestedException: 2000 need_to_destroy = True 2001 except: 2002 need_to_destroy = True 2003 raise 2004 finally: 2005 # if __debug__: 2006 # if last_debug_log_counter < _debug_log_counter: 2007 # if __debug__: dlog(f'_ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}') 2008 2009 if need_to_destroy: 2010 self.destroy()
should return False if ready to stop looping :return:
Inherited Members
- CoroSchedulerBase
- in_iteration
- services
- live_services
- live_low_latency_services
- requests
- responses
- new_born_coroutines
- coroutines
- coro_counter
- services_in_work
- services_in_foreground_work
- services_in_active_state
- services_in_active_state_list
- time_left_before_next_event
- coro_on_del_handlers
- current_coro_interface
- current_coro_wrapper
- suppress_coro_exceptions
- iteration_index
- context_switches
- current_coro_start_time
- coro_execution_time
- coro_longest_execution_time
- loop_iteration_start_time
- need_to_measure_loop_iteration_time
- need_to_measure_coro_time
- need_to_gather_coro_history
- permitted_use_put_coro_from_coro
- get_coro_start_time
- get_loop_iteration_start_time
- coro_history_gatherer
- on_woke_up_callback
- coro_on_start_handlers
- global_on_start_handlers_turned_on
- execute_global_on_start_handlers
- coro_workers_history
- coro_full_history
- run_low_latency_services
- services_impostrors
- keep_coro_execution_time_between_iterations
- keep_coro_longest_execution_time_between_iterations
- keep_coro_workers_history_between_iterations
- keep_coro_full_history_between_iterations
- loop_iteration_delta_time
- suppress_warnings_about_responses_to_not_existant_coroutines
- use_internal_sleep
- high_cpu_utilisation_mode
- foreground_coro_num
- on_idle_handlers
- idle_managers_num
- on_wrong_request
- get_service_by_type
- get_service_instance_fast
- get_service_instance
- make_service_live_fast
- make_service_live
- make_service_dead_fast
- make_service_dead
- sliding_window
- need_to_log_loop_start_and_end
- on_destroyed_handlers
- get_entity_stats
- log_exc
- get_service_instance_fast_impl
- get_service_instance_impl
- make_service_live_fast_impl
- make_service_live_impl
- make_service_dead_fast_impl
- make_service_dead_impl
- get_service_instance_fast_impostor_impl
- get_service_instance_impostor_impl
- make_service_live_fast_impostor_impl
- make_service_live_impostor_impl
- make_service_dead_fast_impostor_impl
- make_service_dead_impostor_impl
- set_coro_time_measurement
- set_coro_history_gathering
- set_loop_iteration_time_measurement
- put_coro_fast
- put_coro
- register_service
- is_service_registered
- unregister_service
- destroy_services
- get_service_by_type_impl
- get_service_by_type_impostor_impl
- register_service_impostor
- find_new_born_coroutine
- get_coro
- purify_worker
- set_global_on_start_handlers
- add_on_global_on_start_handler
- process_coro_exit_status
- kill_coro
- throw_coro
- forget_coro_by_id
- find_coro_by_id
- del_coro_by_id
- kill_coro_by_id
- throw_coro_by_id
- request_coro_throw_by_id
- request_coro_close_by_id
- turn_on_embedded_mode
- add_global_on_coro_del_handler
- restore_global_state
- current_loop
- current_interface
- current_wrapper
- destroy_all_coroutines
- destroy_new_born_coroutines
- destroy_coroutines
- destroy
- in_work
- is_awake
- is_idle
- next_event_after
2039@contextmanager 2040def around_await(): 2041 """ 2042 It is very bad idea to await from inside the greenlet since it might lead to problems with stack 2043 state in an outside awaitable code and it's loop. 2044 Must be run around `await ...` from inside Coro or Service when running from inside external async loop 2045 :return: 2046 """ 2047 loop = _current_coro_scheduler.scheduler 2048 try: 2049 yield 2050 finally: 2051 _current_coro_scheduler.scheduler = loop
It is very bad idea to await from inside the greenlet since it might lead to problems with stack
state in an outside awaitable code and it's loop.
Must be run around await ...
from inside Coro or Service when running from inside external async loop
:return:
2054class Request: 2055 def __init__(self, coro: 'CoroWrapperBase', service_type: ServiceType, *args, **kwargs): 2056 self.coro = coro 2057 self.coro_id = coro.coro_id 2058 self.service_type = service_type 2059 self.args = args 2060 self.kwargs = kwargs 2061 2062 def __repr__(self): 2063 return f'<{self.__class__.__name__}(coro: {self.coro}, coro_id: {self.coro_id}, service_type: {self.service_type}, args: {self.args}, kwargs: {self.kwargs})>'
2066class Response: 2067 def __init__(self, coro_id: CoroID, service_type: ServiceType, response: Any, exception: Optional[BaseException]=None): 2068 self.coro_id = coro_id 2069 self.service_type = service_type 2070 self.response = response 2071 self.exception = exception 2072 2073 def __call__(self) -> Any: 2074 if self.exception: 2075 raise self.exception 2076 return self.response 2077 2078 def __repr__(self): 2079 return f'<{self.__class__.__name__}(coro_id: {self.coro_id}, service_type: {self.service_type}, response: {self.response}, exception: {self.exception})>'
2680class Interface: 2681 def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase): 2682 self._loop: CoroSchedulerBase = loop # type: CoroSchedulerBase 2683 self._coro: CoroWrapperBase = coro # type: CoroWrapperBase 2684 self.coro_id: CoroID = coro.coro_id # type: CoroID 2685 self.in_work: bool = False 2686 self.ignored_by_watchdog: bool = False 2687 self.logger: logging.Logger = self._loop.logger 2688 self.log: logging.Logger = self.logger 2689 2690 @contextmanager 2691 def ignore_by_watchdog(self): 2692 current_state = self.ignored_by_watchdog 2693 self.ignored_by_watchdog = True 2694 try: 2695 yield 2696 finally: 2697 self.ignored_by_watchdog = current_state 2698 2699 def _normalize_call_args_kwargs(self, service_type: Union[ServiceType, 'ServiceRequest'], args, kwargs) -> Tuple[Type['Service'], Tuple, Dict]: 2700 if inspect.isclass(service_type): 2701 if issubclass(service_type, Service): 2702 return service_type, args, kwargs 2703 elif issubclass(service_type, ServiceRequest): 2704 # return service_type, args, kwargs 2705 return self._normalize_call_args_kwargs(service_type()(*args, **kwargs)) 2706 else: 2707 print(f'service_type of an unsupported type: {service_type}') 2708 raise TypeError(f'service_type of an unsupported type: {service_type}') 2709 elif isinstance(service_type, ServiceRequest): 2710 request: ServiceRequest = service_type 2711 service_type = request.default_service_type 2712 if service_type is None: 2713 print(f'Service request class {type(request)} have no default service assigned. Please provide service_type explicitly') 2714 raise RuntimeError(f'Service request class {type(request)} have no default service assigned. Please provide service_type explicitly') 2715 else: 2716 args, kwargs = args_kwargs(request, *args, **kwargs) 2717 return service_type, args, kwargs 2718 else: 2719 print(f'{service_type=}') 2720 raise ValueError(f'{service_type=}') 2721 2722 # def __call__(self, service_type: ServiceType, *args, **kwargs) -> Any: 2723 # """ 2724 # Should be called from inside coroutines only. 2725 # Will request some long running work to some service. 2726 2727 # :param coro_id: 2728 # :param service_type: 2729 # :param args: 2730 # :param kwargs: 2731 # """ 2732 2733 # response = self.__put_task_method(self.__coro, service_type, *args, *kwargs) 2734 # return response() 2735 2736 @overload 2737 def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2738 2739 @overload 2740 def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2741 2742 @overload 2743 def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2744 2745 @overload 2746 def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2747 2748 @overload 2749 def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2750 2751 @overload 2752 def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2753 2754 @overload 2755 def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ... 2756 2757 @overload 2758 def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2759 2760 def __call__(self, service_type, *args, **kwargs): 2761 raise NotImplementedError 2762 2763 # !!! Must not be present in interface since leads to incorrect greenlets switches 2764 # def put_coro(self, coro_type: CoroType, coro_worker: Worker, *args, **kwargs) -> CoroID: 2765 # self.in_work = True 2766 # coro = self._loop.put_coro(coro_type, coro_worker, *args, **kwargs) 2767 # self.in_work = False 2768 # return coro.coro_id 2769 2770 def destroy(self): 2771 self._loop = None 2772 self._coro = None 2773 self.coro_id = None 2774 self.in_work = None
2681 def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase): 2682 self._loop: CoroSchedulerBase = loop # type: CoroSchedulerBase 2683 self._coro: CoroWrapperBase = coro # type: CoroWrapperBase 2684 self.coro_id: CoroID = coro.coro_id # type: CoroID 2685 self.in_work: bool = False 2686 self.ignored_by_watchdog: bool = False 2687 self.logger: logging.Logger = self._loop.logger 2688 self.log: logging.Logger = self.logger
2777class InterfaceGreenlet(Interface): 2778 def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase): 2779 super(InterfaceGreenlet, self).__init__(loop, coro) 2780 self.result = None 2781 2782 @overload 2783 def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2784 2785 @overload 2786 def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2787 2788 @overload 2789 def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2790 2791 @overload 2792 def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2793 2794 @overload 2795 def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2796 2797 @overload 2798 def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2799 2800 @overload 2801 def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ... 2802 2803 @overload 2804 def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2805 2806 def __call__(self, service_type, *args, **kwargs): 2807 """ 2808 Should be called from inside coroutines only. 2809 Will request some long running work to some service. 2810 2811 :param coro_id: 2812 :param service_type: 2813 :param args: 2814 :param kwargs: 2815 """ 2816 2817 service_type, args, kwargs = self._normalize_call_args_kwargs(service_type, args, kwargs) 2818 2819 self.in_work = True 2820 # if __debug__: dlog(f'λ <= (request): <{func_info(self._coro.worker.worker, False)}>: {service_type}, {args}, {kwargs}') 2821 try: 2822 loop: CoroSchedulerGreenlet = self._loop 2823 response: Response = loop.root_coro.switch(Request(self._coro, service_type, *args, **kwargs)) 2824 # TODO: we will switch to the next coro and do necessary preparations and reactions 2825 # if loop._coroutines_can_switch_directly: 2826 # if loop._is_current_coro_was_new_born: 2827 # ... 2828 # else: 2829 # ... 2830 # else: 2831 # response: Response = loop.root_coro.switch(Request(self._coro, service_type, *args, **kwargs)) 2832 except AttributeError: 2833 # if __debug__: dlog(f'x λ: {id(self._loop)}, {self._loop}, root_coro: {self._loop.root_coro}; root_coro_iteration: {self._loop.root_coro_iteration}; current_loop: {self._loop.current_loop()}; current_interface: {self._loop.current_interface()}') 2834 raise 2835 # if __debug__: dlog(f'λ => (response): <{func_info(self._coro.worker.worker)}>: {repr(response)}') 2836 self.in_work = False 2837 if isinstance(response, Response): 2838 return response() 2839 2840 dlog(f"ERROR:\n\tRESPONSE TYPE: {type(response)}\n\tRESPONSE REPR: {repr(response)}\n\tRESPONSE STR: {str(response)}") 2841 raise RuntimeError(f'Wrong type of response from the service: {type(response)}; {repr(response)}.') 2842 2843 def register_result(self, result: Any): 2844 self.result = result 2845 2846 def destroy(self): 2847 self.result = None 2848 return super().destroy()
Inherited Members
2649def find_coro_type(entity) -> CoroType: 2650 if isinstance(entity, EntityArgsHolder): 2651 entity, args, kwargs = entity.entity_args_kwargs() 2652 2653 if inspect.iscoroutine(entity) or inspect.isgenerator(entity) or inspect.iscoroutinefunction(entity) or inspect.isgeneratorfunction(entity) or inspect.isasyncgen(entity) or inspect.isasyncgenfunction(entity) or inspect.isawaitable(entity): 2654 return CoroType.awaitable 2655 elif callable(entity): 2656 return CoroType.greenlet 2657 else: 2658 raise TypeError(f'{entity} is neither an awaitable nor a greenlet')
2851class InterfaceAsyncAwait(Interface): 2852 def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase): 2853 super(InterfaceAsyncAwait, self).__init__(loop, coro) 2854 2855 @overload 2856 async def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2857 2858 @overload 2859 async def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2860 2861 @overload 2862 async def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2863 2864 @overload 2865 async def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2866 2867 @overload 2868 async def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2869 2870 @overload 2871 async def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2872 2873 @overload 2874 async def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ... 2875 2876 @overload 2877 async def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2878 2879 async def __call__(self, service_type, *args, **kwargs): 2880 """ 2881 Should be called from inside coroutines only. 2882 Will request some long running work to some service. 2883 2884 :param coro_id: 2885 :param service_type: 2886 :param args: 2887 :param kwargs: 2888 """ 2889 2890 service_type, args, kwargs = self._normalize_call_args_kwargs(service_type, args, kwargs) 2891 2892 self.in_work = True 2893 response = await yield_task_from_asyncawait(Request(self._coro, service_type, *args, **kwargs)) 2894 self.in_work = False 2895 if isinstance(response, Response): 2896 return response() 2897 2898 dlog(f"ERROR:\n\tRESPONSE TYPE: {type(response)}\n\tRESPONSE REPR: {repr(response)}\n\tRESPONSE STR: {str(response)}") 2899 raise RuntimeError('Wrong type of response from the service')
Inherited Members
2902class InterfaceFake(Interface): 2903 def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase): 2904 self._loop: CoroSchedulerBase = None # type: CoroSchedulerBase 2905 self._coro: CoroWrapperBase = None # type: CoroWrapperBase 2906 self.coro_id: CoroID = None # type: CoroID 2907 self.in_work: bool = False 2908 2909 @overload 2910 def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2911 2912 @overload 2913 def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2914 2915 @overload 2916 def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2917 2918 @overload 2919 def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2920 2921 @overload 2922 def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2923 2924 @overload 2925 def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2926 2927 @overload 2928 def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ... 2929 2930 @overload 2931 def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2932 2933 def __call__(self, service_type, *args, **kwargs) -> Any: 2934 service_type, args, kwargs = self._normalize_call_args_kwargs(service_type, args, kwargs) 2935 return None
Inherited Members
2938class InterfaceFakeAsyncAwait(Interface): 2939 def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase): 2940 self._loop: CoroSchedulerBase = None # type: CoroSchedulerBase 2941 self._coro: CoroWrapperBase = None # type: CoroWrapperBase 2942 self.coro_id: CoroID = None # type: CoroID 2943 self.in_work: bool = False 2944 2945 @overload 2946 async def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2947 2948 @overload 2949 async def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2950 2951 @overload 2952 async def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2953 2954 @overload 2955 async def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ... 2956 2957 @overload 2958 async def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ... 2959 2960 @overload 2961 async def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2962 2963 @overload 2964 async def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ... 2965 2966 @overload 2967 async def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ... 2968 2969 async def __call__(self, service_type, *args, **kwargs): 2970 service_type, args, kwargs = self._normalize_call_args_kwargs(service_type, args, kwargs) 2971 return None
Inherited Members
2974class CallerCoroInfo: 2975 def __init__(self, coro: CoroWrapperBase): 2976 self.coro: CoroWrapperBase = coro 2977 self.coro_id: CoroID = coro.coro_id 2978 self.coro_type: Type = type(coro)
2985class ServiceRequest: 2986 default_service_type: Optional[Type['Service']] = None 2987 default__request__type__: int = 0 2988 2989 def __init__(self): 2990 self.request_type: int = None # type: Optional[int] 2991 self.args: Optional[Tuple] = None # type: Optional[Tuple] 2992 self.kwargs: Optional[Dict] = None # type: Optional[Dict] 2993 self.provide_to_request_handler: bool = False 2994 2995 def _save(self, __request__type__: int, *args, **kwargs) -> 'ServiceRequest': 2996 self.request_type = __request__type__ 2997 self.args = args 2998 self.kwargs = kwargs 2999 return self 3000 3001 def _copy(self) -> 'ServiceRequest': 3002 raise NotImplementedError 3003 3004 def _save_to_copy(self, __request__type__: int, *args, **kwargs) -> 'ServiceRequest': 3005 return self._copy()._save(__request__type__, *args, **kwargs) 3006 3007 def __call__(self, *args: Any, **kwds: Any) -> 'ServiceRequest': 3008 """should call self._save() with some default __request__type__. Required for the Interface(Type[ServiceRequest], *args, **kwargs) call. Must return self 3009 3010 Returns: 3011 ServiceRequest: Must return self 3012 """ 3013 return self._save(self.default__request__type__, *args, **kwds) 3014 3015 def interface(self) -> Any: 3016 current_interface()(self.default_service_type, self) 3017 3018 i = interface 3019 3020 async def async_interface(self) -> Any: 3021 await current_interface()(self.default_service_type, self) 3022 3023 ai = async_interface 3024 3025 def __repr__(self): 3026 return f'<{self.__class__.__name__}(request_type: {self.request_type}, args: {self.args}, kwargs: {self.kwargs})>'
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
3033class ServiceRequestMethodMixin: 3034 def __init__(self, service: 'Service') -> None: 3035 self.service = service 3036 3037 def __call__(self, *args: Any, **kwds: Any) -> Any: 3038 raise NotImplementedError 3039 3040 def full_processing_iteration(self): 3041 raise NotImplementedError 3042 3043 def in_work(self) -> bool: 3044 raise NotImplementedError 3045 3046 def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool: 3047 raise NotImplementedError
3050class DualImmediateProcessingServiceMixin: 3051 def single_task_registration_or_immediate_processing(self, *args, **kwargs) -> Tuple[bool, Any, None]: 3052 if (len(args) == 1) and (len(kwargs) == 0) and (isinstance(args[0], ServiceRequest)) or ((len(args) == 0) and (len(kwargs) == 1) and ('request' in kwargs) and isinstance(kwargs['request'], ServiceRequest)): 3053 return self.single_task_registration_or_immediate_processing_multiple(*args, **kwargs) 3054 else: 3055 return self.single_task_registration_or_immediate_processing_single(*args, **kwargs) 3056 3057 def single_task_registration_or_immediate_processing_multiple(self, request: ServiceRequest 3058 ) -> Tuple[bool, Any, None]: 3059 return self.resolve_request(request) 3060 3061 def single_task_registration_or_immediate_processing_single( 3062 self, *args, **kwargs 3063 ) -> Tuple[bool, Optional[CoroID], Any]: 3064 raise NotImplementedError
3051 def single_task_registration_or_immediate_processing(self, *args, **kwargs) -> Tuple[bool, Any, None]: 3052 if (len(args) == 1) and (len(kwargs) == 0) and (isinstance(args[0], ServiceRequest)) or ((len(args) == 0) and (len(kwargs) == 1) and ('request' in kwargs) and isinstance(kwargs['request'], ServiceRequest)): 3053 return self.single_task_registration_or_immediate_processing_multiple(*args, **kwargs) 3054 else: 3055 return self.single_task_registration_or_immediate_processing_single(*args, **kwargs)
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
3073class Service(Iterable): 3074 def __init__(self, loop: CoroSchedulerBase): 3075 super(Service, self).__init__() 3076 self._loop: CoroSchedulerBase = loop # type: CoroSchedulerBase 3077 # self._requests = list() # type: List[Request] 3078 self._responses: List[Response] = list() # type: List[Response] 3079 self.current_caller_coro_info: Optional[CallerCoroInfo] = None # type: Optional[CallerCoroInfo] 3080 self._request_workers = dict() # type: Dict[int, Callable] 3081 3082 def iteration(self) -> Optional[List[Response]]: 3083 # requests = self._requests 3084 # self._requests = type(self._requests)() 3085 self._responses = list() 3086 # for request in requests: 3087 # self.current_caller_coro_info = CallerCoroInfo(request.coro_id) 3088 # result_exists, result = \ 3089 # self.single_task_registration_or_immediate_processing(*request.args, **request.kwargs) 3090 # if result_exists: 3091 # self.register_response(request.coro_id, result) 3092 # self.current_caller_coro_info = None 3093 try: 3094 self.full_processing_iteration() 3095 except: 3096 # if __debug__: dlog(sys.exc_info()) 3097 raise 3098 return self._responses 3099 3100 def make_response(self, coro_id: CoroID, response: Any, exception: Optional[BaseException]=None): 3101 return Response(coro_id, type(self), response, exception) 3102 3103 def register_response(self, coro_id: CoroID, response: Any, exception: Optional[BaseException]=None): 3104 self._responses.append(self.make_response(coro_id, response, exception)) 3105 3106 def put_task(self, request: Request) -> Optional[Response]: 3107 self.current_caller_coro_info = CallerCoroInfo(request.coro) 3108 result_exists, result, exception = \ 3109 self.single_task_registration_or_immediate_processing(*request.args, **request.kwargs) 3110 self.current_caller_coro_info = None 3111 if result_exists or exception: 3112 return self.make_response(request.coro_id, result, exception) 3113 return None 3114 3115 def resolve_request(self, request: ServiceRequest): 3116 try: 3117 if request.provide_to_request_handler: 3118 return self._request_workers[request.request_type](request, *request.args, **request.kwargs) 3119 else: 3120 return self._request_workers[request.request_type](*request.args, **request.kwargs) 3121 except: 3122 return True, None, get_exception() 3123 3124 def try_resolve_request(self, *args, **kwargs): 3125 possible_request: Optional[ServiceRequest] = None 3126 if (len(args) == 1) and (len(kwargs) == 0): 3127 possible_request = args[0] 3128 elif (len(kwargs) == 1) and (len(args) == 0): 3129 possible_request = kwargs.pop('request', None) 3130 3131 if possible_request is not None: 3132 if isinstance(possible_request, ServiceRequest): 3133 return self.resolve_request(possible_request) 3134 3135 return None 3136 3137 def single_task_registration_or_immediate_processing(self, *args, **kwargs) -> ServiceProcessingResponse: 3138 raise NotImplementedError 3139 3140 def full_processing_iteration(self): 3141 raise NotImplementedError 3142 3143 def in_work(self) -> bool: 3144 """Will be executed twice per iteration: once before and once after the full_processing_iteration() execution 3145 3146 Raises: 3147 NotImplementedError: _description_ 3148 3149 Returns: 3150 bool: _description_ 3151 """ 3152 raise NotImplementedError 3153 3154 def in_forground_work(self) -> bool: 3155 return True 3156 3157 def thrifty_in_work(self, result: bool) -> bool: 3158 if result: 3159 return True 3160 else: 3161 self.make_dead() 3162 return False 3163 3164 def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]: 3165 return False, None 3166 3167 def is_low_latency(self) -> bool: 3168 return False 3169 3170 def make_live(self): 3171 self._loop.make_service_live_fast(type(self), self.is_low_latency()) 3172 3173 def make_dead(self): 3174 self._loop.make_service_dead_fast(type(self)) 3175 3176 @staticmethod 3177 def service_id_impl(): 3178 return None 3179 3180 @staticmethod 3181 def service_id(service_type: Type): 3182 service_id = service_type.service_id_impl() 3183 if service_id is None: 3184 service_id = service_type.__name__ 3185 3186 return service_id 3187 3188 def destroy(self): 3189 pass
3074 def __init__(self, loop: CoroSchedulerBase): 3075 super(Service, self).__init__() 3076 self._loop: CoroSchedulerBase = loop # type: CoroSchedulerBase 3077 # self._requests = list() # type: List[Request] 3078 self._responses: List[Response] = list() # type: List[Response] 3079 self.current_caller_coro_info: Optional[CallerCoroInfo] = None # type: Optional[CallerCoroInfo] 3080 self._request_workers = dict() # type: Dict[int, Callable]
3082 def iteration(self) -> Optional[List[Response]]: 3083 # requests = self._requests 3084 # self._requests = type(self._requests)() 3085 self._responses = list() 3086 # for request in requests: 3087 # self.current_caller_coro_info = CallerCoroInfo(request.coro_id) 3088 # result_exists, result = \ 3089 # self.single_task_registration_or_immediate_processing(*request.args, **request.kwargs) 3090 # if result_exists: 3091 # self.register_response(request.coro_id, result) 3092 # self.current_caller_coro_info = None 3093 try: 3094 self.full_processing_iteration() 3095 except: 3096 # if __debug__: dlog(sys.exc_info()) 3097 raise 3098 return self._responses
should return False if ready to stop looping :return:
3106 def put_task(self, request: Request) -> Optional[Response]: 3107 self.current_caller_coro_info = CallerCoroInfo(request.coro) 3108 result_exists, result, exception = \ 3109 self.single_task_registration_or_immediate_processing(*request.args, **request.kwargs) 3110 self.current_caller_coro_info = None 3111 if result_exists or exception: 3112 return self.make_response(request.coro_id, result, exception) 3113 return None
3115 def resolve_request(self, request: ServiceRequest): 3116 try: 3117 if request.provide_to_request_handler: 3118 return self._request_workers[request.request_type](request, *request.args, **request.kwargs) 3119 else: 3120 return self._request_workers[request.request_type](*request.args, **request.kwargs) 3121 except: 3122 return True, None, get_exception()
3124 def try_resolve_request(self, *args, **kwargs): 3125 possible_request: Optional[ServiceRequest] = None 3126 if (len(args) == 1) and (len(kwargs) == 0): 3127 possible_request = args[0] 3128 elif (len(kwargs) == 1) and (len(args) == 0): 3129 possible_request = kwargs.pop('request', None) 3130 3131 if possible_request is not None: 3132 if isinstance(possible_request, ServiceRequest): 3133 return self.resolve_request(possible_request) 3134 3135 return None
3143 def in_work(self) -> bool: 3144 """Will be executed twice per iteration: once before and once after the full_processing_iteration() execution 3145 3146 Raises: 3147 NotImplementedError: _description_ 3148 3149 Returns: 3150 bool: _description_ 3151 """ 3152 raise NotImplementedError
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Inherited Members
- Service
- Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- single_task_registration_or_immediate_processing
- full_processing_iteration
- in_work
- in_forground_work
- thrifty_in_work
- time_left_before_next_event
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
- destroy
2092class CoroWrapperBase: 2093 def __init__(self, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs): 2094 if isinstance(worker, EntityArgsHolder): 2095 worker, args, kwargs = worker.entity_args_kwargs() 2096 2097 self.worker = worker # type: Worker 2098 self.init_args = args # type: Tuple[Any, ...] 2099 self.init_kwargs = kwargs # type: Dict 2100 self.coro_id: CoroID = coro_id # type: CoroID 2101 self.loop: CoroSchedulerBase = loop # type: CoroSchedulerBase 2102 self.coro = None # type: Optional[Coro] 2103 self.parent_coro = None # type: Optional[Coro] 2104 self.interface = None # type: Optional[Interface] 2105 self.last_result = None # type: Optional[Union[Request, Any]] # return value can be any 2106 self.exception = None # type: Optional[Exception] # If there was an exception 2107 self.coro_on_del_handlers = set() # type: Set[Callable] 2108 self._make_coro_method = self._raise_not_implemented_error # type: Callable 2109 self._make_interface = self._raise_not_implemented_error # type: Callable 2110 self._init_method = self._raise_not_implemented_error # type: Callable 2111 self._call_method = self._raise_not_implemented_error # type: Callable 2112 self._throw_method = self._raise_not_implemented_error # type: Callable 2113 self._close_method = self._raise_not_implemented_error # type: Callable 2114 self._current_call_method = self._call_method # type: Callable 2115 self._is_background_coro: bool = False 2116 self.loop.foreground_coro_num += 1 2117 2118 def _travers_through_coro_on_del_handlers(self, coro_exit_status: 'CoroExitStatus'): 2119 if not self.coro_on_del_handlers: 2120 return coro_exit_status 2121 2122 exception_properly_handled = coro_exit_status.properly_handled 2123 for handler in self.coro_on_del_handlers: 2124 exception_properly_handled = handler(self) or exception_properly_handled 2125 coro_exit_status.properly_handled = exception_properly_handled 2126 return coro_exit_status 2127 2128 def init(self, parent_coro: Optional[Coro] = None) -> Union[None, 'CoroExitStatus']: 2129 self.parent_coro = parent_coro 2130 self.coro = self._make_coro_method() 2131 self.interface = self._make_interface() 2132 current_coro_interface_buff = self.loop.current_coro_interface 2133 self.loop.current_coro_interface = self.interface 2134 try: 2135 self._init_method(self.init_args, self.init_kwargs) 2136 if self: 2137 return None 2138 else: 2139 # if __debug__: dlog(f'LAST_RESULT. Type: {type(self.last_result)}; Value: {self.last_result}') 2140 return self._travers_through_coro_on_del_handlers(CoroExitStatus(None, True)) 2141 except: 2142 self.exception = get_exception() 2143 if not self.coro_on_del_handlers: 2144 return CoroExitStatus(self.exception, False) 2145 return self._travers_through_coro_on_del_handlers(CoroExitStatus(self.exception, False)) 2146 finally: 2147 self.loop.current_coro_interface = current_coro_interface_buff 2148 2149 def __call__(self, *args, **kwargs) -> Union[None, 'CoroExitStatus']: 2150 current_coro_interface_buff = self.loop.current_coro_interface 2151 self.loop.current_coro_interface = self.interface 2152 try: 2153 self._current_call_method(*args, **kwargs) 2154 if self: 2155 return None 2156 else: 2157 # if __debug__: dlog(f'LAST_RESULT. Type: {type(self.last_result)}; Value: {self.last_result}') 2158 return self._travers_through_coro_on_del_handlers(CoroExitStatus(None, True)) 2159 except: 2160 self.exception = get_exception() 2161 if not self.coro_on_del_handlers: 2162 return CoroExitStatus(self.exception, False) 2163 return self._travers_through_coro_on_del_handlers(CoroExitStatus(self.exception, False)) 2164 finally: 2165 self.loop.current_coro_interface = current_coro_interface_buff 2166 2167 def __bool__(self) -> bool: 2168 raise NotImplementedError 2169 2170 def add_on_coro_del_handler(self, callback: OnCoroDelHandler): 2171 self.coro_on_del_handlers.add(callback) 2172 2173 def _raise_not_implemented_error(self, *args, **kwargs): 2174 pass 2175 raise NotImplementedError 2176 return self._raise_not_implemented_error # Suppressing lint error 2177 2178 def _current_throw_method_helper(self, *args, **kwargs): 2179 self._throw_method(*args, **kwargs) 2180 self._current_call_method = self._call_method 2181 2182 def request_throw(self) -> Any: 2183 self._current_call_method = self._current_throw_method_helper 2184 2185 def _current_close_method_helper(self, *args, **kwargs): 2186 self._close_method() 2187 self._current_call_method = self._call_method 2188 2189 def request_close(self) -> Any: 2190 self._current_call_method = self._current_close_method_helper 2191 2192 @property 2193 def is_background_coro(self): 2194 return self._is_background_coro 2195 2196 @is_background_coro.setter 2197 def is_background_coro(self, value: bool): 2198 if value: 2199 if not self._is_background_coro: 2200 self.loop.foreground_coro_num -= 1 2201 else: 2202 if self._is_background_coro: 2203 self.loop.foreground_coro_num += 1 2204 2205 self._is_background_coro = value 2206 2207 def destroy(self): 2208 if self.interface is not None: 2209 if isinstance(self.interface, Interface): 2210 self.interface.destroy() 2211 2212 if not self.is_background_coro: 2213 self.loop.foreground_coro_num -= 1 2214 2215 self._is_background_coro = None 2216 self.init_args = None 2217 self.init_kwargs = None 2218 self.coro_id = None 2219 self.worker = None 2220 self.loop = None 2221 self.coro = None 2222 self.parent_coro = None 2223 self.interface = None 2224 self.last_result = None 2225 self.exception = None 2226 self.coro_on_del_handlers = None 2227 self._make_coro_method = None 2228 self._make_interface = None 2229 self._init_method = None 2230 self._call_method = None 2231 self._throw_method = None 2232 self._close_method = None 2233 self._current_call_method = None
2093 def __init__(self, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs): 2094 if isinstance(worker, EntityArgsHolder): 2095 worker, args, kwargs = worker.entity_args_kwargs() 2096 2097 self.worker = worker # type: Worker 2098 self.init_args = args # type: Tuple[Any, ...] 2099 self.init_kwargs = kwargs # type: Dict 2100 self.coro_id: CoroID = coro_id # type: CoroID 2101 self.loop: CoroSchedulerBase = loop # type: CoroSchedulerBase 2102 self.coro = None # type: Optional[Coro] 2103 self.parent_coro = None # type: Optional[Coro] 2104 self.interface = None # type: Optional[Interface] 2105 self.last_result = None # type: Optional[Union[Request, Any]] # return value can be any 2106 self.exception = None # type: Optional[Exception] # If there was an exception 2107 self.coro_on_del_handlers = set() # type: Set[Callable] 2108 self._make_coro_method = self._raise_not_implemented_error # type: Callable 2109 self._make_interface = self._raise_not_implemented_error # type: Callable 2110 self._init_method = self._raise_not_implemented_error # type: Callable 2111 self._call_method = self._raise_not_implemented_error # type: Callable 2112 self._throw_method = self._raise_not_implemented_error # type: Callable 2113 self._close_method = self._raise_not_implemented_error # type: Callable 2114 self._current_call_method = self._call_method # type: Callable 2115 self._is_background_coro: bool = False 2116 self.loop.foreground_coro_num += 1
2128 def init(self, parent_coro: Optional[Coro] = None) -> Union[None, 'CoroExitStatus']: 2129 self.parent_coro = parent_coro 2130 self.coro = self._make_coro_method() 2131 self.interface = self._make_interface() 2132 current_coro_interface_buff = self.loop.current_coro_interface 2133 self.loop.current_coro_interface = self.interface 2134 try: 2135 self._init_method(self.init_args, self.init_kwargs) 2136 if self: 2137 return None 2138 else: 2139 # if __debug__: dlog(f'LAST_RESULT. Type: {type(self.last_result)}; Value: {self.last_result}') 2140 return self._travers_through_coro_on_del_handlers(CoroExitStatus(None, True)) 2141 except: 2142 self.exception = get_exception() 2143 if not self.coro_on_del_handlers: 2144 return CoroExitStatus(self.exception, False) 2145 return self._travers_through_coro_on_del_handlers(CoroExitStatus(self.exception, False)) 2146 finally: 2147 self.loop.current_coro_interface = current_coro_interface_buff
2207 def destroy(self): 2208 if self.interface is not None: 2209 if isinstance(self.interface, Interface): 2210 self.interface.destroy() 2211 2212 if not self.is_background_coro: 2213 self.loop.foreground_coro_num -= 1 2214 2215 self._is_background_coro = None 2216 self.init_args = None 2217 self.init_kwargs = None 2218 self.coro_id = None 2219 self.worker = None 2220 self.loop = None 2221 self.coro = None 2222 self.parent_coro = None 2223 self.interface = None 2224 self.last_result = None 2225 self.exception = None 2226 self.coro_on_del_handlers = None 2227 self._make_coro_method = None 2228 self._make_interface = None 2229 self._init_method = None 2230 self._call_method = None 2231 self._throw_method = None 2232 self._close_method = None 2233 self._current_call_method = None
2236class CoroWrapperGreenlet(CoroWrapperBase): 2237 def __init__(self, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs): 2238 super(CoroWrapperGreenlet, self).__init__(loop, coro_id, worker, *args, **kwargs) 2239 self._make_coro_method = self._make_coro_method_imp 2240 self._make_interface = self._make_interface_imp 2241 self._init_method = self._init_method_imp 2242 self._call_method = self._call_method_imp 2243 self._throw_method = self._throw_method_imp # type: Callable 2244 self._close_method = self._close_method_imp # type: Callable 2245 self._current_call_method = self._call_method # type: Callable 2246 2247 def _make_coro_method_imp(self): 2248 return greenlet(self.worker, self.parent_coro) 2249 2250 def _make_interface_imp(self): 2251 return InterfaceGreenlet(self.loop, self) 2252 2253 def _init_method_imp(self, init_args, init_kwargs): 2254 try: 2255 # if __debug__: dlog(f'λ => (init): {func_info(self.worker.worker)}') 2256 self.last_result = self.coro.switch(self.interface, *init_args, **init_kwargs) # TODO: wrong 2257 # if __debug__: dlog(f'λ <= (init): {repr(self.worker.worker)}') 2258 except GreenletExit: 2259 # if __debug__: dlog(f'λ <= (init; GreenletExit): {repr(self.worker.worker)}') 2260 self.last_result = self.interface.result 2261 except: 2262 # if __debug__: dlog(f'λ <= (init; Exception): {repr(self.worker.worker)}') 2263 self.last_result = None 2264 raise 2265 2266 def _call_method_imp(self, *args, **kwargs): 2267 try: 2268 # if __debug__: dlog(f'λ => (call): {func_info(self.worker.worker)}') 2269 self.last_result = self.coro.switch(*args, **kwargs) # TODO: wrong 2270 # if __debug__: dlog(f'λ <= (call): {repr(self.worker.worker)}') 2271 except GreenletExit: 2272 # if __debug__: dlog(f'λ <= (init; GreenletExit): {repr(self.worker.worker)}') 2273 self.last_result = self.interface.result 2274 except: 2275 # if __debug__: dlog(f'λ <= (init; Exception): {repr(self.worker.worker)}') 2276 self.last_result = None 2277 raise 2278 2279 # def _throw_method_imp(self, *args, **kwargs): 2280 def _throw_method_imp(self, ex_type, ex_value=None, ex_traceback=None): 2281 try: 2282 # if __debug__: dlog(f'λ => (throw): {func_info(self.worker.worker)}') 2283 self.last_result = self.coro.throw(ex_type, ex_value, ex_traceback) 2284 # if __debug__: dlog(f'λ <= (throw): {repr(self.worker.worker)}') 2285 except GreenletExit: 2286 # if __debug__: dlog(f'λ <= (init; GreenletExit): {repr(self.worker.worker)}') 2287 self.last_result = self.interface.result 2288 except: 2289 # if __debug__: dlog(f'λ <= (init; Exception): {repr(self.worker.worker)}') 2290 self.last_result = None 2291 raise 2292 2293 def _close_method_imp(self): 2294 try: 2295 # if __debug__: dlog(f'λ => (close): {func_info(self.worker.worker)}') 2296 self.last_result = self.coro.throw() 2297 # if __debug__: dlog(f'λ <= (close): {repr(self.worker.worker)}') 2298 except GreenletExit: 2299 # if __debug__: dlog(f'λ <= (init; GreenletExit): {repr(self.worker.worker)}') 2300 self.last_result = self.interface.result 2301 except: 2302 # if __debug__: dlog(f'λ <= (init; Exception): {repr(self.worker.worker)}') 2303 self.last_result = None 2304 raise 2305 2306 def __bool__(self) -> bool: 2307 return bool(self.coro) 2308 2309 def destroy(self): 2310 if isinstance(self.worker, GreenletWorkerWrapper): 2311 self.worker.destroy() 2312 2313 return super().destroy()
2237 def __init__(self, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs): 2238 super(CoroWrapperGreenlet, self).__init__(loop, coro_id, worker, *args, **kwargs) 2239 self._make_coro_method = self._make_coro_method_imp 2240 self._make_interface = self._make_interface_imp 2241 self._init_method = self._init_method_imp 2242 self._call_method = self._call_method_imp 2243 self._throw_method = self._throw_method_imp # type: Callable 2244 self._close_method = self._close_method_imp # type: Callable 2245 self._current_call_method = self._call_method # type: Callable
2370class CoroWrapperAsyncAwait(CoroWrapperBase): 2371 def __init__(self, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs): 2372 super(CoroWrapperAsyncAwait, self).__init__(loop, coro_id, worker, *args, **kwargs) 2373 self._make_coro_method = self._make_coro_method_imp 2374 self._make_interface = self._make_interface_imp 2375 self.in_run_state = False 2376 self.subtype = self._setup_subtype() 2377 self._current_call_method = self._call_method # type: Callable 2378 2379 def _make_coro_method_imp(self): 2380 return self.worker 2381 2382 def _make_interface_imp(self): 2383 return InterfaceAsyncAwait(self.loop, self) 2384 2385 def __bool__(self) -> bool: 2386 return bool(self.in_run_state) 2387 2388 def _setup_subtype(self): 2389 if inspect.iscoroutine(self.worker): 2390 # if __debug__: dlog('λ >>>\tCOROUTINE') 2391 self._init_method = self._init_coroutine 2392 self._call_method = self._call_coroutine 2393 self._throw_method = self._throw_coroutine 2394 self._close_method = self._close_coroutine 2395 return 0 2396 elif inspect.isgenerator(self.worker): 2397 # if __debug__: dlog('λ >>>\tGENERATOR') 2398 self._init_method = self._init_generator 2399 self._call_method = self._call_coroutine 2400 self._throw_method = self._throw_coroutine 2401 self._close_method = self._close_coroutine 2402 return 1 2403 elif inspect.iscoroutinefunction(self.worker): 2404 # if __debug__: dlog('λ >>>\tCOROUTINE FUNCTION') 2405 self._init_method = self._init_coroutinefunction 2406 self._call_method = self._call_coroutine 2407 self._throw_method = self._throw_coroutine 2408 self._close_method = self._close_coroutine 2409 return 2 2410 elif inspect.isgeneratorfunction(self.worker): 2411 # if __debug__: dlog('λ >>>\tGENERATOR FUNCTION') 2412 self._init_method = self._init_generatorfunction 2413 self._call_method = self._call_coroutine 2414 self._throw_method = self._throw_coroutine 2415 self._close_method = self._close_coroutine 2416 return 3 2417 elif inspect.isasyncgen(self.worker): 2418 # if __debug__: dlog('λ >>>\tASYNC GENERATOR') 2419 self._init_method = self._init_asyncgenerator 2420 self._call_method = self._call_asyncgenerator 2421 self._throw_method = self._throw_asyncgenerator 2422 self._close_method = self._close_asyncgenerator 2423 return 4 2424 elif inspect.isasyncgenfunction(self.worker): 2425 # if __debug__: dlog('λ >>>\tASYNC GENERATOR FUNCTION') 2426 self._init_method = self._init_asyncgeneratorfunction 2427 self._call_method = self._call_asyncgenerator 2428 self._throw_method = self._throw_asyncgenerator 2429 self._close_method = self._close_asyncgenerator 2430 return 5 2431 elif inspect.isawaitable(self.worker): 2432 # if __debug__: dlog('λ >>>\tAWAITABLE') 2433 self._init_method = self._init_awaitable 2434 self._call_method = self._call_coroutine 2435 self._throw_method = self._throw_coroutine 2436 self._close_method = self._close_coroutine 2437 return 6 2438 elif callable(self.worker): 2439 # if __debug__: dlog('λ >>>\tCALLABLE') 2440 self._init_method = self._init_callable 2441 self._call_method = self._call_coroutine 2442 self._throw_method = self._throw_coroutine 2443 self._close_method = self._close_coroutine 2444 return 7 2445 else: 2446 raise TypeError(f'{self.worker} is neither an awaitable nor a wrapper for an awaitable') 2447 2448 def _init_coroutine(self, init_args, init_kwargs): 2449 try: 2450 self.in_run_state = True 2451 self.last_result = self.coro.send(None) 2452 except StopIteration as ex: 2453 self.in_run_state = False 2454 self.last_result = ex.value 2455 except: 2456 self.in_run_state = False 2457 raise 2458 2459 def _call_coroutine(self, *args, **kwargs): 2460 try: 2461 self.in_run_state = True 2462 self.last_result = self.coro.send(*args, **kwargs) 2463 except StopIteration as ex: 2464 self.in_run_state = False 2465 self.last_result = ex.value 2466 except: 2467 self.in_run_state = False 2468 raise 2469 2470 if sys.version_info >= (3, 12): 2471 # if (3, 12) <= PYTHON_VERSION_INT: 2472 def _throw_coroutine(self, ex_type, ex_value=None, ex_traceback=None): 2473 try: 2474 self.in_run_state = True 2475 if ex_value is None: 2476 ex_value = ex_type() 2477 2478 self.last_result = self.coro.throw(ex_value) # Changed in version 3.12: The second signature (type[, value[, traceback]]) is deprecated and may be removed in a future version of Python. 2479 except StopIteration as ex: 2480 self.in_run_state = False 2481 self.last_result = ex.value 2482 except: 2483 self.in_run_state = False 2484 raise 2485 else: 2486 def _throw_coroutine(self, ex_type, ex_value=None, ex_traceback=None): 2487 try: 2488 self.in_run_state = True 2489 self.last_result = self.coro.throw(ex_type, ex_value, ex_traceback) # Changed in version 3.12: The second signature (type[, value[, traceback]]) is deprecated and may be removed in a future version of Python. 2490 except StopIteration as ex: 2491 self.in_run_state = False 2492 self.last_result = ex.value 2493 except: 2494 self.in_run_state = False 2495 raise 2496 2497 def _close_coroutine(self, *args, **kwargs): 2498 try: 2499 self.in_run_state = True 2500 self.last_result = self.coro.close() 2501 self.in_run_state = False 2502 except GeneratorExit as ex: 2503 self.in_run_state = False 2504 except: 2505 self.in_run_state = False 2506 raise 2507 2508 def _init_generator(self, init_args, init_kwargs): 2509 try: 2510 self.in_run_state = True 2511 self.last_result = next(self.coro) # ToDo: investigate how to provide an initial parameters 2512 except StopIteration as ex: 2513 self.in_run_state = False 2514 self.last_result = ex.value 2515 except: 2516 self.in_run_state = False 2517 raise 2518 2519 def _init_coroutinefunction(self, init_args, init_kwargs): 2520 self.coro = self.coro(self.interface, *init_args, **init_kwargs) 2521 self._init_coroutine(None, None) 2522 2523 def _init_generatorfunction(self, init_args, init_kwargs): 2524 self.coro = self.coro(self.interface, *init_args, **init_kwargs) 2525 self._init_generator(None, None) 2526 2527 def _init_asyncgenerator(self, init_args, init_kwargs): 2528 try: 2529 self.in_run_state = True 2530 entity = init_asyncgenerator(self.coro) 2531 result = entity.send(None) 2532 except StopIteration as ex: 2533 self.in_run_state = False 2534 result = ex.value 2535 except: 2536 self.in_run_state = False 2537 raise 2538 2539 self.last_result, exception = result 2540 if exception is not None: 2541 self.in_run_state = False 2542 if not isinstance(exception, StopAsyncIteration): 2543 raise exception 2544 2545 def _call_asyncgenerator(self, *args, **kwargs): 2546 try: 2547 self.in_run_state = True 2548 entity = call_asyncgenerator(self.coro, *args, **kwargs) 2549 result = entity.send(None) 2550 except StopIteration as ex: 2551 self.in_run_state = False 2552 result = ex.value 2553 except: 2554 self.in_run_state = False 2555 raise 2556 2557 self.last_result, exception = result 2558 if exception is not None: 2559 self.in_run_state = False 2560 if not isinstance(exception, StopAsyncIteration): 2561 raise exception 2562 2563 def _throw_asyncgenerator(self, *args, **kwargs): 2564 try: 2565 self.in_run_state = True 2566 entity = throw_asyncgenerator(self.coro, *args, **kwargs) 2567 result = entity.send(None) 2568 except StopIteration as ex: 2569 self.in_run_state = False 2570 result = ex.value 2571 except: 2572 self.in_run_state = False 2573 raise 2574 2575 self.last_result, exception = result 2576 if exception is not None: 2577 self.in_run_state = False 2578 if not isinstance(exception, StopAsyncIteration): 2579 raise exception 2580 2581 def _close_asyncgenerator(self): 2582 try: 2583 self.in_run_state = True 2584 entity = close_asyncgenerator(self.coro) 2585 result = entity.send(None) 2586 except StopIteration as ex: # TODO: check maybe `GeneratorExit` will be raised 2587 self.in_run_state = False 2588 result = ex.value 2589 except: 2590 self.in_run_state = False 2591 raise 2592 2593 self.last_result, exception = result 2594 if exception is not None: 2595 self.in_run_state = False 2596 if not isinstance(exception, StopAsyncIteration): 2597 raise exception 2598 2599 def _init_asyncgeneratorfunction(self, init_args, init_kwargs): 2600 try: 2601 self.in_run_state = True 2602 entity = init_asyncgeneratorfunction(self.coro, self.interface, *init_args, **init_kwargs) 2603 result = entity.send(None) 2604 except StopIteration as ex: 2605 self.in_run_state = False 2606 result = ex.value 2607 except: 2608 self.in_run_state = False 2609 raise 2610 2611 self.coro, self.last_result, exception = result 2612 if exception is not None: 2613 self.in_run_state = False 2614 if not isinstance(exception, StopAsyncIteration): 2615 raise exception 2616 2617 def _init_awaitable(self, init_args, init_kwargs): 2618 self.coro = awaitable_wrapper(self.coro) 2619 self._init_coroutine() 2620 2621 def _init_callable(self, init_args, init_kwargs): 2622 self.coro = callable_wrapper(self.coro, self.interface, *init_args, **init_kwargs) 2623 self._init_coroutine() 2624 2625 def destroy(self): 2626 self._make_coro_method = None 2627 self._make_interface = None 2628 self.in_run_state = None 2629 self.subtype = None 2630 return super().destroy()
2371 def __init__(self, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs): 2372 super(CoroWrapperAsyncAwait, self).__init__(loop, coro_id, worker, *args, **kwargs) 2373 self._make_coro_method = self._make_coro_method_imp 2374 self._make_interface = self._make_interface_imp 2375 self.in_run_state = False 2376 self.subtype = self._setup_subtype() 2377 self._current_call_method = self._call_method # type: Callable
291def log_exception_traceback_info(): 292 exception = sys.exc_info() 293 formattedTraceback = traceback.format_exception(exception[0], exception[1], exception[2]) 294 exception = exception[:2] + (formattedTraceback,) 295 trace = '' 296 for line in exception[2]: 297 trace += line 298 if __debug__: dlog(trace, file=sys.stderr) 299 if __debug__: dlog(exception[0]) 300 if __debug__: dlog(exception[1])
312def func_info(func, full_name: Optional[bool]=True): 313 if full_name: 314 # return f'{func.__class__}({func.__module__}.{func.__qualname__}) @ {func.__code__.co_filename}:{func.__code__.co_firstlineno}' 315 return f'{func.__class__}({func.__qualname__}) @ {func.__code__.co_filename}:{func.__code__.co_firstlineno}' 316 else: 317 return f'{func.__class__}({func.__qualname__}) @ {os.path.basename(func.__code__.co_filename)}:{func.__code__.co_firstlineno}'
347def full_func_info_to_dict(my): 348 return { 349 'repr': repr(my), 350 'module': str(my.__module__), 351 'name': str(my.__name__), 352 'qualname': str(my.__qualname__), 353 'annotations': str(my.__annotations__), 354 'class': str(my.__class__), 355 'closure': str(my.__closure__), 356 'code': str(my.__code__), 357 'co_argcount': str(my.__code__.co_argcount), 358 'co_cellvars': str(my.__code__.co_cellvars), 359 'co_code': str(my.__code__.co_code), 360 'co_consts': str(my.__code__.co_consts), 361 'co_filename': str(my.__code__.co_filename), 362 'co_firstlineno': str(my.__code__.co_firstlineno), 363 'co_flags': str(my.__code__.co_flags), 364 'co_freevars': str(my.__code__.co_freevars), 365 'co_kwonlyargcount': str(my.__code__.co_kwonlyargcount), 366 'co_lnotab': str(my.__code__.co_lnotab), 367 'co_name': str(my.__code__.co_name), 368 'co_names': str(my.__code__.co_names), 369 'co_nlocals': str(my.__code__.co_nlocals), 370 'co_stacksize': str(my.__code__.co_stacksize), 371 'co_varnames': str(my.__code__.co_varnames), 372 }
375def full_func_info_to_printable_dict(func: Callable) -> Dict[str, str]: 376 func_info: Dict[str, str] = full_func_info_to_dict(func) 377 good_keys = {'module', 'qualname', 'class', 'co_filename', 'co_firstlineno'} 378 result: Dict[str, str] = dict() 379 for key, value in func_info.items(): 380 if key in good_keys: 381 result[key] = value 382 383 return result
2633class GreenletWorkerWrapper: 2634 def __init__(self, worker: Worker): 2635 self.worker = worker 2636 2637 def __call__(self, interface: 'InterfaceGreenlet', *args, **kwargs): 2638 result = None 2639 try: 2640 result = self.worker(interface, *args, **kwargs) 2641 finally: 2642 interface.register_result(result) # in case if GreenletExit will be raised by greenlet framework 2643 return result 2644 2645 def destroy(self): 2646 self.worker = None
857class EntityStatsMixin: 858 class StatsLevel(Enum): 859 info = 0 860 debug = 1 861 862 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel') -> Tuple[str, Dict[str, Any]]: 863 raise NotImplementedError
An enumeration.
Inherited Members
- enum.Enum
- name
- value
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all exceptions
Inherited Members
- builtins.BaseException
- BaseException
- with_traceback
- args