cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

   1#!/usr/bin/env python
   2# coding=utf-8
   3
   4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
   5# 
   6# Licensed under the Apache License, Version 2.0 (the "License");
   7# you may not use this file except in compliance with the License.
   8# You may obtain a copy of the License at
   9# 
  10#     http://www.apache.org/licenses/LICENSE-2.0
  11# 
  12# Unless required by applicable law or agreed to in writing, software
  13# distributed under the License is distributed on an "AS IS" BASIS,
  14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15# See the License for the specific language governing permissions and
  16# limitations under the License.
  17
  18
  19"""
  20Module Docstring
  21Docstrings: http://www.python.org/dev/peps/pep-0257/
  22"""
  23
  24
  25__author__ = "ButenkoMS <gtalk@butenkoms.space>"
  26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
  27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
  28__license__ = "Apache License, Version 2.0"
  29__version__ = "4.4.1"
  30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
  31__email__ = "gtalk@butenkoms.space"
  32# __status__ = "Prototype"
  33__status__ = "Development"
  34# __status__ = "Production"
  35
  36
  37__all__ = [
  38    'Counter', 'Iterable', 'ServiceResponseTypeVar', 'ServiceType', 'TypedServiceType', 'NormalizableServiceType', 
  39    'ItemID', 'CoroID', 'Worker', 'GreenetCoro', 'ACoro', 'Coro', 'OnCoroDelHandler', 'cs_coro', 'cs_acoro', 'cs_callable', 'cs_acallable', 
  40    'OutsideCoroSchedulerContext', 'current_coro_scheduler', 'get_current_coro_scheduler', 'set_primary_coro_scheduler', 
  41    'PrimaryCoroSchedulerWasNotSet', 'primary_coro_scheduler', 'get_primary_coro_scheduler', 'CoroSchedulerContextIsNotAvailable', 
  42    'available_coro_scheduler', 'get_available_coro_scheduler', 'WrongTypeOfShedulerError', 'InterfaceIsNotAvailableError', 
  43    'CurrentCoroIsNotAliveError', 'loop_with_backup_loop', 'get_loop_with_backup_loop', 'loop_with_explicit_loop', 
  44    'get_loop_with_explicit_loop', 'interface_and_loop_with_backup_loop', 'get_interface_and_loop_with_backup_loop', 
  45    'interface_and_loop_with_explicit_loop', 'get_interface_and_loop_with_explicit_loop', 'interface_for_an_explicit_loop', 
  46    'get_interface_for_an_explicit_loop', 'service_with_backup_loop', 'get_service_with_backup_loop', 'service_with_explicit_loop', 
  47    'get_service_with_explicit_loop', 'service_fast_with_backup_loop', 'get_service_fast_with_backup_loop', 
  48    'service_fast_with_explicit_loop', 'get_service_fast_with_explicit_loop', 'CoroType', 'ExplicitWorker', 'AnyWorker', 
  49    'CoroScheduler', 'CoroSchedulerType' , 'CoroSchedulerGreenlet', 'CoroSchedulerAwaitable', 'current_interface', 'execute_coro', 'exec_coro', 'ecoro', 'aexecute_coro', 'aexec_coro', 'aecoro', 
  50    'around_await', 'Request', 'Response', 'DirectResponse', 'Interface', 'InterfaceGreenlet', 'find_coro_type', 
  51    'InterfaceAsyncAwait', 'InterfaceFake', 'InterfaceFakeAsyncAwait', 'CallerCoroInfo', 'ServiceRequest', 'TypedServiceRequest', 
  52    'ServiceRequestMethodMixin', 'DualImmediateProcessingServiceMixin', 'WrongServiceRequestError', 'Service', 'TypedService', 
  53    'CoroWrapperBase', 'CoroWrapperGreenlet', 'CoroWrapperAsyncAwait', 'dlog', 'log_exception_traceback_info', 
  54    'log_uncatched_exception', 'func_info', 'ServiceProcessingResultExists', 'ServiceProcessingResult', 'ServiceProcessingException', 
  55    'ServiceProcessingResponse', 'full_func_info_to_dict', 'full_func_info_to_printable_dict', 'GreenletWorkerWrapper', 
  56    'EntityStatsMixin', 'CoroSchedulerIsCurrentlyDestroingError', 'CoroSchedulerDestroyException', 
  57    'CoroSchedulerDestroyRequestedException', 'greenlet_awailable', 'GreenletExit'
  58]
  59
  60import sys
  61import os
  62import inspect
  63from contextlib import contextmanager
  64from typing import Coroutine, Dict, Tuple, List, Callable, Awaitable, Any, Optional, Type, Set, Union, Generator, AsyncGenerator, overload, Generic, TypeVar
  65# try:
  66#     from typing import TypeAlias, ParamSpec, Concatenate
  67# except ImportError:
  68#     from typing_extensions import TypeAlias, ParamSpec, Concatenate
  69from typing_extensions import TypeAlias, ParamSpec, Concatenate
  70from enum import Enum
  71import types
  72from cengal.time_management.cpu_clock_cycles import perf_counter
  73import traceback
  74from contextlib import contextmanager
  75import logging
  76from datetime import datetime
  77from cengal.introspection.inspect import get_exception, exception_to_printable_text, is_async
  78from cengal.code_flow_control.args_manager import args_kwargs
  79from threading import local
  80from cengal.time_management.sleep_tools import sleep
  81from collections import deque
  82from cengal.code_flow_control.args_manager import EntityArgsHolder, EntityArgsHolderExplicit
  83from cengal.time_management.cpu_clock_cycles import cpu_clock_cycles
  84from cengal.system import PYTHON_VERSION_INT
  85from functools import wraps, update_wrapper
  86
  87
  88greenlet_awailable: bool = True
  89try:
  90    from greenlet import greenlet, GreenletExit
  91except ImportError:
  92    greenlet_awailable = False
  93    class GreenletExit(Exception):
  94        pass
  95
  96# greenlet_awailable = False
  97
  98class Counter:
  99    def __init__(self):
 100        self._index = -1  # type: int
 101
 102    def get(self) -> int:
 103        self._index += 1
 104        return self._index
 105
 106
 107class Iterable:
 108    def iteration(self) -> bool:
 109        """
 110        should return False if ready to stop looping
 111        :return:
 112        """
 113        raise NotImplementedError
 114
 115
 116ServiceResponseTypeVar = TypeVar('ServiceResponseTypeVar')
 117
 118
 119ServiceType = Type['Service']
 120TypedServiceType = Type['TypedService[ServiceResponseTypeVar]']
 121NormalizableServiceType = Union[Type['Service'], Type['TypedService[ServiceResponseTypeVar]'], Type['ServiceRequest'], Type['TypedServiceRequest[ServiceResponseTypeVar]'], 'Service', 'TypedService[ServiceResponseTypeVar]', 'ServiceRequest', 'TypedServiceRequest[ServiceResponseTypeVar]']
 122ItemID = int
 123CoroID = ItemID
 124# Worker = Callable[['Interface'], Any]
 125if sys.version_info >= (3, 10):
 126    GreenletWoker = Callable[['Interface', ...], Any]
 127    AsyncWoker = Callable[['Interface', ...], Awaitable[Any]]
 128    Worker = Union[GreenletWoker, AsyncWoker]
 129else:
 130    GreenletWoker = Callable[['Interface'], Any]
 131    AsyncWoker = Callable[['Interface'], Awaitable[Any]]
 132    Worker = Union[GreenletWoker, AsyncWoker]
 133
 134GreenetCoro = greenlet
 135ACoro = Union[Awaitable, Coroutine, Generator, AsyncGenerator, Callable]
 136Coro = Union[GreenetCoro, ACoro]
 137OnCoroDelHandler = Callable[['CoroWrapperBase'], bool]
 138
 139
 140CoroParams = ParamSpec("CoroParams")
 141CoroResult = TypeVar("CoroResult")
 142
 143
 144def cs_coro(coro_worker: Callable[CoroParams, CoroResult]) -> Callable[Concatenate['Interface', CoroParams], CoroResult]:
 145    """Decorator. Without arguments. Makes a greenlet Cengal coroutine from a pain function (which don't have an interface parameter)
 146
 147    Args:
 148        coro_worker (Callable): _description_
 149    """
 150    if is_async(coro_worker):
 151        async def wrapper(i: Interface, *args, **kwargs):
 152            return await coro_worker(*args, **kwargs)
 153            
 154        coro_worker_sign: inspect.Signature = inspect.signature(coro_worker)
 155        update_wrapper(wrapper, coro_worker)
 156        wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation)
 157        # wrapper.__name__ = coro_worker.__name__
 158        wrapper: coro_worker
 159        return wrapper
 160    else:
 161        def wrapper(i: Interface, *args, **kwargs):
 162            return coro_worker(*args, **kwargs)
 163            
 164        coro_worker_sign: inspect.Signature = inspect.signature(coro_worker)
 165        update_wrapper(wrapper, coro_worker)
 166        wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation)
 167        # wrapper.__name__ = coro_worker.__name__
 168        wrapper: coro_worker
 169        return wrapper
 170
 171
 172def cs_acoro(coro_aworker: Callable[CoroParams, Awaitable[CoroResult]]) -> Callable[Concatenate['Interface', CoroParams], Awaitable[CoroResult]]:
 173    """Decorator. Without arguments. Makes an async Cengal coroutine from an async function (which don't have an interface parameter)
 174
 175    Args:
 176        coro_aworker (Callable): _description_
 177    """    
 178    if is_async(coro_aworker):
 179        async def wrapper(i: Interface, *args, **kwargs):
 180            return await coro_aworker(*args, **kwargs)
 181            
 182        coro_worker_sign: inspect.Signature = inspect.signature(coro_aworker)
 183        update_wrapper(wrapper, coro_aworker)
 184        wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation)
 185        # wrapper.__name__ = coro_aworker.__name__
 186        wrapper: coro_aworker
 187        return wrapper
 188    else:
 189        def wrapper(i: Interface, *args, **kwargs):
 190            return coro_aworker(*args, **kwargs)
 191            
 192        coro_worker_sign: inspect.Signature = inspect.signature(coro_aworker)
 193        update_wrapper(wrapper, coro_aworker)
 194        wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation)
 195        wrapper.__name__ = coro_aworker.__name__
 196        wrapper: coro_aworker
 197        return wrapper
 198
 199
 200def cs_callable(coro_worker: Callable[Concatenate['Interface', CoroParams], CoroResult]) -> Callable[CoroParams, CoroResult]:
 201    """Decorator. Without arguments. Makes a callable sync Cengal coroutine (which don't have an interface parameter) from a sync Cengal coroutine
 202
 203    Args:
 204        coro_worker (Callable): _description_
 205    """
 206    if is_async(coro_worker):
 207        async def wrapper(*args, **kwargs):
 208            i: Interface = current_interface()
 209            return await coro_worker(i, *args, **kwargs)
 210            
 211        coro_worker_sign: inspect.Signature = inspect.signature(coro_worker)
 212        update_wrapper(wrapper, coro_worker)
 213        # wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation)
 214        wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation)
 215        # wrapper.__name__ = coro_worker.__name__
 216        wrapper: coro_worker
 217        return wrapper
 218    else:
 219        def wrapper(*args, **kwargs):
 220            i: Interface = current_interface()
 221            return coro_worker(i, *args, **kwargs)
 222            
 223        coro_worker_sign: inspect.Signature = inspect.signature(coro_worker)
 224        update_wrapper(wrapper, coro_worker)
 225        # wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation)
 226        wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation)
 227        # wrapper.__name__ = coro_worker.__name__
 228        wrapper: coro_worker
 229        return wrapper
 230
 231
 232def cs_acallable(coro_aworker: Callable[Concatenate['Interface', CoroParams], Awaitable[CoroResult]]) -> Callable[CoroParams, Awaitable[CoroResult]]:
 233    """Decorator. Without arguments. Makes a callable async Cengal coroutine (which don't have an interface parameter) from a async Cengal coroutine
 234
 235    Args:
 236        coro_aworker (Callable): _description_
 237    """
 238    if is_async(coro_aworker):
 239        async def wrapper(*args, **kwargs):
 240            i: Interface = current_interface()
 241            return await coro_aworker(i, *args, **kwargs)
 242            
 243        coro_worker_sign: inspect.Signature = inspect.signature(coro_aworker)
 244        update_wrapper(wrapper, coro_aworker)
 245        # wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation)
 246        wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation)
 247        # wrapper.__name__ = coro_aworker.__name__
 248        wrapper: coro_aworker
 249        return wrapper
 250    else:
 251        def wrapper(*args, **kwargs):
 252            i: Interface = current_interface()
 253            return coro_aworker(i, *args, **kwargs)
 254            
 255        coro_worker_sign: inspect.Signature = inspect.signature(coro_aworker)
 256        update_wrapper(wrapper, coro_aworker)
 257        # wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation)
 258        wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation)
 259        # wrapper.__name__ = coro_aworker.__name__
 260        wrapper: coro_aworker
 261        return wrapper
 262
 263
 264class _ThreadLocalCoroScheduler(local):
 265    scheduler: Optional['CoroSchedulerType'] = None
 266
 267
 268_primary_coro_scheduler: _ThreadLocalCoroScheduler = _ThreadLocalCoroScheduler()
 269_current_coro_scheduler: _ThreadLocalCoroScheduler = _ThreadLocalCoroScheduler()
 270
 271
 272_debug_log_counter = 0
 273
 274
 275def _debug_log(*args, **kwargs):
 276    print(*args, **kwargs)
 277    global _debug_log_counter
 278    _debug_log_counter += 1
 279
 280
 281def _fake_debug_log(*args, **kwargs):
 282    pass
 283
 284
 285dlog = _fake_debug_log
 286if False and __debug__:
 287    dlog = _debug_log
 288
 289
 290def log_exception_traceback_info():
 291    exception = sys.exc_info()
 292    formattedTraceback = traceback.format_exception(exception[0], exception[1], exception[2])
 293    exception = exception[:2] + (formattedTraceback,)
 294    trace = ''
 295    for line in exception[2]:
 296        trace += line
 297    if __debug__: dlog(trace, file=sys.stderr)
 298    if __debug__: dlog(exception[0])
 299    if __debug__: dlog(exception[1])
 300
 301
 302@contextmanager
 303def log_uncatched_exception():
 304    try:
 305        yield
 306    except:
 307        log_exception_traceback_info()
 308        raise
 309
 310
 311def func_info(func, full_name: Optional[bool]=True):
 312    if full_name:
 313        # return f'{func.__class__}({func.__module__}.{func.__qualname__}) @ {func.__code__.co_filename}:{func.__code__.co_firstlineno}'
 314        return f'{func.__class__}({func.__qualname__}) @ {func.__code__.co_filename}:{func.__code__.co_firstlineno}'
 315    else:
 316        return f'{func.__class__}({func.__qualname__}) @ {os.path.basename(func.__code__.co_filename)}:{func.__code__.co_firstlineno}'
 317
 318
 319def full_func_info(my):
 320    if __debug__: dlog(repr(my), 
 321        my.__module__, 
 322        my.__name__, 
 323        my.__qualname__, 
 324        my.__annotations__, 
 325        my.__class__, 
 326        my.__closure__, 
 327        my.__code__, 
 328        my.__code__.co_argcount,
 329        my.__code__.co_cellvars,
 330        my.__code__.co_code,
 331        my.__code__.co_consts,
 332        my.__code__.co_filename,
 333        my.__code__.co_firstlineno,
 334        my.__code__.co_flags,
 335        my.__code__.co_freevars,
 336        my.__code__.co_kwonlyargcount,
 337        my.__code__.co_lnotab,
 338        my.__code__.co_name,
 339        my.__code__.co_names,
 340        my.__code__.co_nlocals,
 341        my.__code__.co_stacksize,
 342        my.__code__.co_varnames,
 343        )
 344
 345
 346def full_func_info_to_dict(my):
 347    return {
 348        'repr': repr(my), 
 349        'module': str(my.__module__), 
 350        'name': str(my.__name__), 
 351        'qualname': str(my.__qualname__), 
 352        'annotations': str(my.__annotations__), 
 353        'class': str(my.__class__), 
 354        'closure': str(my.__closure__), 
 355        'code': str(my.__code__), 
 356        'co_argcount': str(my.__code__.co_argcount),
 357        'co_cellvars': str(my.__code__.co_cellvars),
 358        'co_code': str(my.__code__.co_code),
 359        'co_consts': str(my.__code__.co_consts),
 360        'co_filename': str(my.__code__.co_filename),
 361        'co_firstlineno': str(my.__code__.co_firstlineno),
 362        'co_flags': str(my.__code__.co_flags),
 363        'co_freevars': str(my.__code__.co_freevars),
 364        'co_kwonlyargcount': str(my.__code__.co_kwonlyargcount),
 365        'co_lnotab': str(my.__code__.co_lnotab),
 366        'co_name': str(my.__code__.co_name),
 367        'co_names': str(my.__code__.co_names),
 368        'co_nlocals': str(my.__code__.co_nlocals),
 369        'co_stacksize': str(my.__code__.co_stacksize),
 370        'co_varnames': str(my.__code__.co_varnames),
 371    }
 372
 373
 374def full_func_info_to_printable_dict(func: Callable) -> Dict[str, str]:
 375    func_info: Dict[str, str] = full_func_info_to_dict(func)
 376    good_keys = {'module', 'qualname', 'class', 'co_filename', 'co_firstlineno'}
 377    result: Dict[str, str] = dict()
 378    for key, value in func_info.items():
 379        if key in good_keys:
 380            result[key] = value
 381    
 382    return result
 383
 384
 385class OutsideCoroSchedulerContext(Exception):
 386    pass
 387
 388
 389def current_coro_scheduler() -> 'CoroSchedulerType':
 390    if _current_coro_scheduler.scheduler is None:
 391        raise OutsideCoroSchedulerContext
 392    
 393    return _current_coro_scheduler.scheduler
 394
 395
 396def get_current_coro_scheduler() -> Optional['CoroSchedulerType']:
 397    return _current_coro_scheduler.scheduler
 398
 399
 400def set_primary_coro_scheduler(coro_scheduler: 'CoroSchedulerType'):
 401    _primary_coro_scheduler.scheduler = coro_scheduler
 402
 403
 404class PrimaryCoroSchedulerWasNotSet(Exception):
 405    pass
 406
 407
 408def primary_coro_scheduler() -> 'CoroSchedulerType':
 409    if _primary_coro_scheduler.scheduler is None:
 410        raise PrimaryCoroSchedulerWasNotSet
 411    
 412    return _primary_coro_scheduler.scheduler
 413
 414
 415def get_primary_coro_scheduler() -> Optional['CoroSchedulerType']:
 416    return _primary_coro_scheduler.scheduler
 417
 418
 419class CoroSchedulerContextIsNotAvailable(Exception):
 420    pass
 421
 422
 423def available_coro_scheduler() -> 'CoroSchedulerType':
 424    if _current_coro_scheduler.scheduler:
 425        return _current_coro_scheduler.scheduler
 426    elif _primary_coro_scheduler.scheduler:
 427        return _primary_coro_scheduler.scheduler
 428    else:
 429        raise CoroSchedulerContextIsNotAvailable
 430
 431
 432def get_available_coro_scheduler() -> Optional['CoroSchedulerType']:
 433    if _current_coro_scheduler.scheduler:
 434        return _current_coro_scheduler.scheduler
 435    elif _primary_coro_scheduler.scheduler:
 436        return _primary_coro_scheduler.scheduler
 437    else:
 438        return None
 439
 440
 441class WrongTypeOfShedulerError(Exception):
 442    pass
 443
 444
 445class InterfaceIsNotAvailableError(Exception):
 446    pass
 447
 448
 449class CurrentCoroIsNotAliveError(Exception):
 450    pass
 451
 452
 453# ==========================================================
 454
 455
 456def loop_with_backup_loop(backup_scheduler: Optional['CoroSchedulerType'] = None) -> 'CoroSchedulerType':
 457    if backup_scheduler is not None:
 458        if not isinstance(backup_scheduler, CoroScheduler):
 459            raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}')
 460
 461    loop = CoroScheduler.current_loop()
 462    if loop is None:
 463        # Outside the loop
 464        loop = get_available_coro_scheduler()
 465        if loop is None:
 466            loop = backup_scheduler
 467
 468    if loop is None:
 469        raise CoroSchedulerContextIsNotAvailable
 470    
 471    return loop
 472
 473
 474def get_loop_with_backup_loop(backup_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['CoroSchedulerType']:
 475    if backup_scheduler is not None:
 476        if not isinstance(backup_scheduler, CoroScheduler):
 477            raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}')
 478
 479    loop = CoroScheduler.current_loop()
 480    if loop is None:
 481        # Outside the loop
 482        loop = get_available_coro_scheduler()
 483        if loop is None:
 484            loop = backup_scheduler
 485    
 486    return loop
 487
 488
 489def loop_with_explicit_loop(explicit_scheduler: Optional['CoroSchedulerType'] = None) -> 'CoroSchedulerType':
 490    loop = explicit_scheduler
 491    current_loop = CoroScheduler.current_loop()
 492    if loop is None:
 493        loop = current_loop
 494        if loop is None:
 495            # Outside the loop
 496            loop = get_available_coro_scheduler()
 497    else:
 498        if not isinstance(loop, CoroScheduler):
 499            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
 500
 501    if loop is None:
 502        raise CoroSchedulerContextIsNotAvailable
 503    
 504    return loop
 505
 506
 507def get_loop_with_explicit_loop(explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['CoroSchedulerType']:
 508    loop = explicit_scheduler
 509    current_loop = CoroScheduler.current_loop()
 510    if loop is None:
 511        loop = current_loop
 512        if loop is None:
 513            # Outside the loop
 514            loop = get_available_coro_scheduler()
 515    else:
 516        if not isinstance(loop, CoroScheduler):
 517            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
 518    
 519    return loop
 520# ==========================================================
 521
 522
 523def interface_and_loop_with_backup_loop(backup_scheduler: Optional['CoroSchedulerType'] = None) -> Tuple['CoroSchedulerType', 'Interface', bool]:
 524    if backup_scheduler is not None:
 525        if not isinstance(backup_scheduler, CoroScheduler):
 526            raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}')
 527
 528    loop = CoroScheduler.current_loop()
 529    if loop is None:
 530        # Outside the loop
 531        interface = None
 532        loop = get_available_coro_scheduler()
 533        if loop is None:
 534            loop = backup_scheduler
 535    else:
 536        # In the loop (in coroutine or in the service)
 537        interface = loop.current_interface()
 538
 539    if loop is None:
 540        raise CoroSchedulerContextIsNotAvailable
 541    
 542    if interface is None:
 543        raise InterfaceIsNotAvailableError
 544    
 545    coro_alive: bool = False
 546    if interface is not None:
 547        if interface._coro:
 548            coro_alive = True
 549    
 550    if not coro_alive:
 551        raise CurrentCoroIsNotAliveError
 552    
 553    return loop, interface, coro_alive
 554
 555
 556def get_interface_and_loop_with_backup_loop(backup_scheduler: Optional['CoroSchedulerType'] = None) -> Tuple[Optional['CoroSchedulerType'], Optional['Interface'], bool]:
 557    if backup_scheduler is not None:
 558        if not isinstance(backup_scheduler, CoroScheduler):
 559            raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}')
 560
 561    loop = CoroScheduler.current_loop()
 562    if loop is None:
 563        # Outside the loop
 564        interface = None
 565        loop = get_available_coro_scheduler()
 566        if loop is None:
 567            loop = backup_scheduler
 568    else:
 569        # In the loop (in coroutine or in the service)
 570        interface = loop.current_interface()
 571    
 572    coro_alive: bool = False
 573    if interface is not None:
 574        if interface._coro:
 575            coro_alive = True
 576    
 577    return loop, interface, coro_alive
 578
 579
 580def interface_and_loop_with_explicit_loop(explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Tuple['CoroSchedulerType', 'Interface', bool]:
 581    loop = explicit_scheduler
 582    current_loop = CoroScheduler.current_loop()
 583    interface = None
 584    if loop is None:
 585        loop = current_loop
 586        if loop is None:
 587            # Outside the loop
 588            loop = get_available_coro_scheduler()
 589        else:
 590            # In the loop (in coroutine or in the service)
 591            interface = loop.current_interface()
 592    else:
 593        if isinstance(loop, CoroScheduler):
 594            if loop is current_loop:
 595                interface = loop.current_interface()
 596        else:
 597            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
 598
 599    if loop is None:
 600        raise CoroSchedulerContextIsNotAvailable
 601    
 602    if interface is None:
 603        raise InterfaceIsNotAvailableError
 604    
 605    coro_alive: bool = False
 606    if interface is not None:
 607        if interface._coro:
 608            coro_alive = True
 609    
 610    if not coro_alive:
 611        raise CurrentCoroIsNotAliveError
 612    
 613    return loop, interface, coro_alive
 614
 615
 616def get_interface_and_loop_with_explicit_loop(explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Tuple[Optional['CoroSchedulerType'], Optional['Interface'], bool]:
 617    loop = explicit_scheduler
 618    current_loop = CoroScheduler.current_loop()
 619    interface = None
 620    if loop is None:
 621        loop = current_loop
 622        if loop is None:
 623            # Outside the loop
 624            loop = get_available_coro_scheduler()
 625        else:
 626            # In the loop (in coroutine or in the service)
 627            interface = loop.current_interface()
 628    else:
 629        if isinstance(loop, CoroScheduler):
 630            if loop is current_loop:
 631                interface = loop.current_interface()
 632        else:
 633            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
 634    
 635    coro_alive: bool = False
 636    if interface is not None:
 637        if interface._coro:
 638            coro_alive = True
 639    
 640    return loop, interface, coro_alive
 641
 642
 643def interface_for_an_explicit_loop(explicit_scheduler: 'CoroSchedulerType') -> Tuple['CoroSchedulerType', 'Interface', bool]:
 644    loop = explicit_scheduler
 645    interface = None
 646    if loop is None:
 647        raise CoroSchedulerContextIsNotAvailable
 648    else:
 649        if isinstance(loop, CoroScheduler):
 650            if loop is CoroScheduler.current_loop():
 651                interface = loop.current_interface()
 652        else:
 653            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
 654    
 655    if interface is None:
 656        raise InterfaceIsNotAvailableError
 657    
 658    coro_alive: bool = False
 659    if interface is not None:
 660        if interface._coro:
 661            coro_alive = True
 662    
 663    if not coro_alive:
 664        raise CurrentCoroIsNotAliveError
 665    
 666    return loop, interface, coro_alive
 667
 668
 669def get_interface_for_an_explicit_loop(explicit_scheduler: 'CoroSchedulerType') -> Tuple[Optional['CoroSchedulerType'], Optional['Interface'], bool]:
 670    loop = explicit_scheduler
 671    interface = None
 672    if loop is not None:
 673        if isinstance(loop, CoroScheduler):
 674            if loop is CoroScheduler.current_loop():
 675                interface = loop.current_interface()
 676        else:
 677            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
 678    
 679    coro_alive: bool = False
 680    if interface is not None:
 681        if interface._coro:
 682            coro_alive = True
 683    
 684    return loop, interface, coro_alive
 685
 686
 687# ==========================================================
 688
 689def service_with_backup_loop(service_type: Type['Service'], backup_scheduler: Optional['CoroSchedulerType'] = None) -> 'Service':
 690    if backup_scheduler is not None:
 691        if not isinstance(backup_scheduler, CoroScheduler):
 692            raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}')
 693
 694    loop = CoroScheduler.current_loop()
 695    if loop is None:
 696        # Outside the loop
 697        loop = get_available_coro_scheduler()
 698        if loop is None:
 699            loop = backup_scheduler
 700
 701    if loop is None:
 702        raise CoroSchedulerContextIsNotAvailable
 703
 704    return loop.get_service_instance(service_type)
 705
 706
 707def get_service_with_backup_loop(service_type: Type['Service'], backup_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['Service']:
 708    if backup_scheduler is not None:
 709        if not isinstance(backup_scheduler, CoroScheduler):
 710            raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}')
 711
 712    loop = CoroScheduler.current_loop()
 713    if loop is None:
 714        # Outside the loop
 715        loop = get_available_coro_scheduler()
 716        if loop is None:
 717            loop = backup_scheduler
 718
 719    if loop is None:
 720        return None
 721    else:
 722        return loop.get_service_instance(service_type)
 723
 724
 725def service_with_explicit_loop(service_type: Type['Service'], explicit_scheduler: Optional['CoroSchedulerType'] = None) -> 'Service':
 726    loop = explicit_scheduler
 727    current_loop = CoroScheduler.current_loop()
 728    if loop is None:
 729        loop = current_loop
 730        if loop is None:
 731            # Outside the loop
 732            loop = get_available_coro_scheduler()
 733    else:
 734        if not isinstance(loop, CoroScheduler):
 735            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
 736
 737    if loop is None:
 738        raise CoroSchedulerContextIsNotAvailable
 739
 740    return loop.get_service_instance(service_type)
 741
 742
 743def get_service_with_explicit_loop(service_type: Type['Service'], explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['Service']:
 744    loop = explicit_scheduler
 745    current_loop = CoroScheduler.current_loop()
 746    if loop is None:
 747        loop = current_loop
 748        if loop is None:
 749            # Outside the loop
 750            loop = get_available_coro_scheduler()
 751    else:
 752        if not isinstance(loop, CoroScheduler):
 753            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
 754
 755    if loop is None:
 756        return None
 757    else:
 758        return loop.get_service_instance(service_type)
 759# ==========================================================
 760
 761
 762def service_fast_with_backup_loop(service_type: Type['Service'], backup_scheduler: Optional['CoroSchedulerType'] = None) -> 'Service':
 763    if backup_scheduler is not None:
 764        if not isinstance(backup_scheduler, CoroScheduler):
 765            raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}')
 766
 767    loop = CoroScheduler.current_loop()
 768    if loop is None:
 769        # Outside the loop
 770        loop = get_available_coro_scheduler()
 771        if loop is None:
 772            loop = backup_scheduler
 773
 774    if loop is None:
 775        raise CoroSchedulerContextIsNotAvailable
 776
 777    return loop.get_service_instance_fast(service_type)
 778
 779
 780def get_service_fast_with_backup_loop(service_type: Type['Service'], backup_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['Service']:
 781    if backup_scheduler is not None:
 782        if not isinstance(backup_scheduler, CoroScheduler):
 783            raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}')
 784
 785    loop = CoroScheduler.current_loop()
 786    if loop is None:
 787        # Outside the loop
 788        loop = get_available_coro_scheduler()
 789        if loop is None:
 790            loop = backup_scheduler
 791
 792    if loop is None:
 793        return None
 794    else:
 795        return loop.get_service_instance_fast(service_type)
 796
 797
 798def service_fast_with_explicit_loop(service_type: Type['Service'], explicit_scheduler: Optional['CoroSchedulerType'] = None) -> 'Service':
 799    loop = explicit_scheduler
 800    current_loop = CoroScheduler.current_loop()
 801    if loop is None:
 802        loop = current_loop
 803        if loop is None:
 804            # Outside the loop
 805            loop = get_available_coro_scheduler()
 806    else:
 807        if not isinstance(loop, CoroScheduler):
 808            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
 809
 810    if loop is None:
 811        raise CoroSchedulerContextIsNotAvailable
 812
 813    return loop.get_service_instance_fast(service_type)
 814
 815
 816def get_service_fast_with_explicit_loop(service_type: Type['Service'], explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['Service']:
 817    loop = explicit_scheduler
 818    current_loop = CoroScheduler.current_loop()
 819    if loop is None:
 820        loop = current_loop
 821        if loop is None:
 822            # Outside the loop
 823            loop = get_available_coro_scheduler()
 824    else:
 825        if not isinstance(loop, CoroScheduler):
 826            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
 827
 828    if loop is None:
 829        return None
 830    else:
 831        return loop.get_service_instance_fast(service_type)
 832# ==========================================================
 833
 834
 835class CoroType(Enum):
 836    auto = 0
 837    awaitable = 1
 838    greenlet = 2
 839
 840
 841class ExplicitWorker:
 842    def __init__(self, coro_type: CoroType, worker: Worker) -> None:
 843        self.coro_type: CoroType = coro_type
 844        self.worker: Worker = worker
 845
 846
 847AnyWorker = Union[ExplicitWorker, Worker]
 848
 849
 850@types.coroutine
 851def yield_task_from_asyncawait(request: 'Request') -> Generator['Response', Any, Any]:
 852    response = yield request  # type: Response
 853    return response
 854
 855
 856class EntityStatsMixin:
 857    class StatsLevel(Enum):
 858        info = 0
 859        debug = 1
 860    
 861    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel') -> Tuple[str, Dict[str, Any]]:
 862        raise NotImplementedError
 863
 864
 865class CoroSchedulerIsCurrentlyDestroingError(Exception):
 866    pass
 867
 868
 869class CoroSchedulerDestroyException(Exception):
 870    pass
 871
 872
 873class CoroSchedulerDestroyRequestedException(Exception):
 874    pass
 875
 876
 877class CoroSchedulerBase(Iterable, EntityStatsMixin):
 878    def __init__(self, logger: Optional[logging.Logger] = None):
 879        self.in_iteration = False
 880        self.services = dict()                                    # type: Dict[ServiceType, Service]
 881        self.live_services = dict()                               # type: Dict[ServiceType, Service]
 882        self.live_low_latency_services = dict()                               # type: Dict[ServiceType, Service]
 883        self.requests = list()                                    # type: List[Request]
 884        self.responses = list()                                   # type: List[Response]
 885        self.new_born_coroutines = list()                         # type: List[CoroWrapperBase]
 886        self.coroutines = dict()                                  # type: Dict[CoroID, CoroWrapperBase]
 887        self.coro_counter = Counter()                             # type: Counter
 888        self.services_in_work = 0                                 # type: int
 889        self.services_in_foreground_work: int = 0
 890        self.services_in_active_state = 0                                 # type: int
 891        self.services_in_active_state_list = list()               # uncomment lines in code for a debutting purposes (when some service prevents loop to go to sleep)
 892        self.time_left_before_next_event: Optional[float] = None  # type: Optional[float]
 893        self.coro_on_del_handlers = set()                         # type: Set[Callable]
 894        self.current_coro_interface = None                        # type: Optional['Interface']
 895        self.current_coro_wrapper: CoroWrapperBase = None
 896        self.suppress_coro_exceptions = True                      # type: bool
 897        self.iteration_index: int = 0
 898        self.context_switches = 0                                 # type: int
 899        self.current_coro_start_time: Optional[float] = None
 900        self.coro_execution_time: Dict[int, float] = dict()
 901        self.coro_longest_execution_time: Dict[int, float] = dict()
 902        self.loop_iteration_start_time = 0.0
 903        self.need_to_measure_loop_iteration_time = False          # type: bool
 904        self.need_to_measure_coro_time = False                    # type: bool
 905        self.need_to_gather_coro_history = False                  # type: bool
 906        self.permitted_use_put_coro_from_coro = False             # type: bool
 907        self.get_coro_start_time = self._fake_perf_counter
 908        self.get_loop_iteration_start_time = self._fake_perf_counter
 909        self.coro_history_gatherer = self._fake_method
 910        self.on_woke_up_callback = self._fake_method
 911        self.coro_on_start_handlers = set()
 912        self.global_on_start_handlers_turned_on = False
 913        self.execute_global_on_start_handlers = self._fake_execute_global_on_start_handlers
 914        self.coro_workers_history = dict()                                # type: Dict[Worker, Set[CoroID]]
 915        self.coro_full_history = dict()                                # type: Dict[CoroID, CoroWrapperBase]
 916        self.run_low_latency_services = self._fake_run_low_latency_services
 917        if logger is None:
 918            self.logger = logging.getLogger(f'cengal.coro_scheduler.{id(self)}')
 919            self.logger_stream_handler = logging.StreamHandler()
 920            self.logger.addHandler(self.logger_stream_handler)
 921            self.logger.setLevel(logging.INFO)
 922        else:
 923            self.logger = logger
 924        
 925        self._in_work: bool = None
 926        self._destroyed: bool = False
 927        self.services_impostrors: Dict = dict()
 928        self.keep_coro_execution_time_between_iterations = False
 929        self.keep_coro_longest_execution_time_between_iterations = False
 930        self.keep_coro_workers_history_between_iterations = False
 931        self.keep_coro_full_history_between_iterations = False
 932        self.loop_iteration_delta_time = 0.0
 933        self.suppress_warnings_about_responses_to_not_existant_coroutines: bool = False
 934        self.use_internal_sleep: bool = True
 935        self.high_cpu_utilisation_mode: bool = False  # For an effective, low-latency, inter-thread switching when there is more active high-CPU utilizing threads than CPU cores. Should be switched off for GUI applications in order to to decrease CPU utilization.
 936        self.foreground_coro_num: int = 0
 937        self.on_idle_handlers: Set[Callable] = set()
 938        self.idle_managers_num: int = 0
 939        self.on_wrong_request: Optional[Callable] = None
 940        self.get_service_by_type: Callable = self.get_service_by_type_impl
 941        self.get_service_instance_fast: Callable = self.get_service_instance_fast_impl
 942        self.get_service_instance: Callable = self.get_service_instance_impl
 943        self.make_service_live_fast: Callable = self.make_service_live_fast_impl
 944        self.make_service_live: Callable = self.make_service_live_impl
 945        self.make_service_dead_fast: Callable = self.make_service_dead_fast_impl
 946        self.make_service_dead: Callable = self.make_service_dead_impl
 947        self.sliding_window = deque(maxlen=1000)
 948        self.need_to_log_loop_start_and_end: bool = True
 949        self.on_destroyed_handlers: Set[Callable] = set()
 950        self._coroutines_can_switch_directly: bool = False
 951        self._next_coroutine: Optional[CoroWrapperBase] = None
 952        self._is_current_coro_was_new_born: Union[bool, None] = None
 953
 954    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
 955        if EntityStatsMixin.StatsLevel.info == stats_level:
 956            func_info = full_func_info_to_printable_dict
 957        else:
 958            func_info = full_func_info_to_dict
 959        
 960        services_stats = dict()
 961        for service in self.services.values():
 962            if isinstance(service, EntityStatsMixin):
 963                name, info = service.get_entity_stats(stats_level)
 964                services_stats[name] = info
 965
 966        return type(self).__name__, {
 967            'loop': {
 968                'services': {
 969                    'num': len(self.services),
 970                    'list': [service_type.__name__ for service_type in self.services.keys()],
 971                },
 972                'live services': {
 973                    'in work num': self.services_in_work,
 974                    'in foreground work num': self.services_in_foreground_work,
 975                    'services in active state': self.services_in_active_state,
 976                    'idleble services': self.services_in_work - self.services_in_active_state,
 977                    'num': len(self.live_services),
 978                    'list': [service_type.__name__ for service_type in self.live_services.keys()],
 979                },
 980                'live low latency services': {
 981                    'num': len(self.live_low_latency_services),
 982                    'list': [service_type.__name__ for service_type in self.live_low_latency_services.keys()],
 983                },
 984                'new born coroutines': {
 985                    'num': len(self.new_born_coroutines),
 986                    "list": {
 987                        wrapper.coro_id: {
 988                            "func_info": func_info(self.purify_worker(wrapper.worker)),
 989                            "args": [repr(arg) for arg in wrapper.init_args],
 990                            "kwargs": {
 991                                repr(key): repr(value)
 992                                for key, value in wrapper.init_kwargs.items()
 993                            },
 994                        }
 995                        for wrapper in self.new_born_coroutines
 996                    },
 997                },
 998                'coroutines': {
 999                    'num': len(self.coroutines),
1000                    "list": {
1001                        coro_id: {
1002                            "func_info": func_info(self.purify_worker(wrapper.worker)),
1003                            "args": [repr(arg) for arg in wrapper.init_args],
1004                            "kwargs": {
1005                                repr(key): repr(value)
1006                                for key, value in wrapper.init_kwargs.items()
1007                            },
1008                        }
1009                        for coro_id, wrapper in self.coroutines.items()
1010                    },
1011                },
1012                'coro counter': self.coro_counter._index,
1013                'coro on del handlers num': len(self.coro_on_del_handlers),
1014                'coro on del handlers': {
1015                    'num': len(self.coro_on_del_handlers),
1016                    'list': [func_info(handler) for handler in self.coro_on_del_handlers]
1017                },
1018                'suppress coro exceptions': self.suppress_coro_exceptions,
1019                'context switches num': self.context_switches,
1020                'longest continuous execution time of coroutines': self.coro_longest_execution_time,
1021                'coroutines execution times': self.coro_execution_time,
1022                'need to measure loop iteration time': self.need_to_measure_loop_iteration_time,
1023                'need to measure coro time': self.need_to_measure_coro_time,
1024                'need to gather coro history': self.need_to_gather_coro_history,
1025                'coro workers history': {
1026                    'num': len(self.coro_workers_history),
1027                    'list': [{
1028                        'worker': func_info(self.purify_worker(worker)),
1029                        'coroutines': coroutines,
1030                    } for worker, coroutines in self.coro_workers_history.items()]
1031                },
1032                'coro full history': {
1033                    'num': len(self.coro_full_history),
1034                    'list': {coro_id: {
1035                        'worker': func_info(self.purify_worker(wrapper.worker)),
1036                        'args': [repr(arg) for arg in wrapper.init_args],
1037                        'kwargs': {repr(key): repr(value) for key, value in wrapper.init_kwargs.items()},
1038                    } for coro_id, wrapper in self.coro_full_history.items()}
1039                },
1040               'coro loop iteration delta time':self.loop_iteration_delta_time, 
1041            },
1042            'services': services_stats,
1043        }
1044
1045    def loop(self):
1046        raise NotImplementedError
1047    
1048    def iteration(self):
1049        raise NotImplementedError
1050    
1051    def log_exc(self, coro: 'CoroWrapperBase', exception: Exception):
1052        worker_info = full_func_info_to_printable_dict(self.purify_worker(coro.worker))
1053        error_text = f'\nAn exception in coroutine "{worker_info["qualname"]}"\n\tModule "{worker_info["module"]}"\n\tFile "{worker_info["co_filename"]}", line {worker_info["co_firstlineno"]}:\n{exception_to_printable_text(exception)}'
1054        self.logger.error(f'{datetime.now()} >> {error_text}')
1055
1056    # @property
1057    # def root_coro(self):
1058    #     return self._root_coro
1059    
1060    # @root_coro.setter
1061    # def root_coro(self, root_coro):
1062    #     if root_coro is None:
1063    #         if __debug__: dlog('root_coro = None')
1064    #     else:
1065    #         if __debug__: dlog(f'root_coro = {root_coro}')
1066    #     self._root_coro = root_coro
1067    
1068    def _fake_method(self, *args, **kwargs):
1069        pass
1070    
1071    @staticmethod
1072    def _fake_perf_counter():
1073        return 0.0
1074    
1075    def get_service_instance_fast_impl(self, service_type: ServiceType):
1076        return self.services[service_type]
1077    
1078    def get_service_instance_impl(self, service_type: ServiceType):
1079        self.register_service(service_type)
1080        return self.get_service_instance_fast_impl(service_type)
1081    
1082    def make_service_live_fast_impl(self, service_type: ServiceType, is_low_latency: bool = False):
1083        service: Optional[Service] = self.services.get(service_type, None)
1084        if service is not None:
1085            self.live_services[service_type] = self.services[service_type]
1086            if is_low_latency:
1087                self.live_low_latency_services[service_type] = self.services[service_type]
1088    
1089    def make_service_live_impl(self, service_type: ServiceType, is_low_latency: bool = False):
1090        self.register_service(service_type)
1091        self.make_service_live_fast_impl(service_type, is_low_latency)
1092    
1093    def make_service_dead_fast_impl(self, service_type: ServiceType):
1094        self.live_services.pop(service_type, None)
1095        self.live_low_latency_services.pop(service_type, None)
1096    
1097    def make_service_dead_impl(self, service_type: ServiceType):
1098        self.register_service(service_type)
1099        self.make_service_dead_fast_impl(service_type)
1100    
1101    def get_service_instance_fast_impostor_impl(self, service_type: ServiceType):
1102        return self.get_service_instance_fast_impl(self.services_impostrors.get(service_type, service_type))
1103    
1104    def get_service_instance_impostor_impl(self, service_type: ServiceType):
1105        return self.get_service_instance_impl(self.services_impostrors.get(service_type, service_type))
1106    
1107    def make_service_live_fast_impostor_impl(self, service_type: ServiceType, is_low_latency: bool = False):
1108        return self.make_service_live_fast_impl(self.services_impostrors.get(service_type, service_type), is_low_latency)
1109    
1110    def make_service_live_impostor_impl(self, service_type: ServiceType, is_low_latency: bool = False):
1111        return self.make_service_live_impl(self.services_impostrors.get(service_type, service_type), is_low_latency)
1112    
1113    def make_service_dead_fast_impostor_impl(self, service_type: ServiceType):
1114        return self.make_service_dead_fast_impl(self.services_impostrors.get(service_type, service_type))
1115    
1116    def make_service_dead_impostor_impl(self, service_type: ServiceType):
1117        return self.make_service_dead_impl(self.services_impostrors.get(service_type, service_type))
1118
1119    def set_coro_time_measurement(self, need_to_measure_coro_time: bool):
1120        previous_value = self.need_to_measure_coro_time
1121        self.need_to_measure_coro_time = need_to_measure_coro_time
1122        if need_to_measure_coro_time:
1123            self.get_coro_start_time = perf_counter
1124        else:
1125            self.get_coro_start_time = self._fake_perf_counter
1126        
1127        return previous_value
1128
1129    def set_coro_history_gathering(self, need_to_gather_coro_history: bool):
1130        previous_value = self.need_to_gather_coro_history
1131        self.need_to_gather_coro_history = need_to_gather_coro_history
1132        if need_to_gather_coro_history:
1133            self.coro_history_gatherer = self._default_coro_history_gatherer
1134        else:
1135            self.coro_history_gatherer = self._fake_method
1136        
1137        return previous_value
1138    
1139    def _default_coro_history_gatherer(self, original_worker: Callable, coro_wrapper: 'CoroWrapperBase'):
1140        coro_id: CoroID = coro_wrapper.coro_id
1141        coro_worker: Worker = coro_wrapper.worker
1142        
1143        self.coro_full_history[coro_id] = coro_wrapper
1144        
1145        if original_worker not in self.coro_workers_history:
1146            self.coro_workers_history[original_worker] = set()
1147        
1148        self.coro_workers_history[original_worker].add(coro_id)
1149
1150    def set_loop_iteration_time_measurement(self, need_to_measure_loop_iteration_time: bool):
1151        previous_value = self.need_to_measure_loop_iteration_time
1152        self.need_to_measure_loop_iteration_time = need_to_measure_loop_iteration_time
1153        if need_to_measure_loop_iteration_time:
1154            self.get_loop_iteration_start_time = perf_counter
1155        else:
1156            self.get_loop_iteration_start_time = self._fake_perf_counter
1157        
1158        return previous_value
1159    
1160    def _new_coro_type_normalizer(self, coro_type: Optional[CoroType]) -> CoroType:
1161        raise NotImplementedError
1162
1163    def put_coro_fast(self, coro_type: Optional[CoroType], coro_worker: Worker, *args, **kwargs) -> 'CoroWrapperBase':
1164        """Must not be called from coroutine. Use an appropriate service instead, since leads to incorrect greenlets switches
1165        """
1166        if (self.current_coro_interface is None) or self.permitted_use_put_coro_from_coro:
1167            coro_type = self._new_coro_type_normalizer(coro_type)
1168            coro_id = self.coro_counter.get()
1169            self.coro_execution_time[coro_id] = 0.0
1170            self.coro_longest_execution_time[coro_id] = 0.0
1171            coro = coro_wrapper_factory(coro_type, self, coro_id, coro_worker, *args, **kwargs)
1172            if isinstance(coro_worker, EntityArgsHolder):
1173                self.coro_history_gatherer(coro_worker.entity, coro)
1174            else:
1175                self.coro_history_gatherer(coro_worker, coro)
1176            
1177            self.new_born_coroutines.append(coro)
1178            if not self.in_iteration:
1179                self.on_woke_up_callback()
1180            
1181            return coro
1182        else:
1183            raise RuntimeError('CoroScheduler.put_coro() method must not be called from coroutine. Use an appropriate service instead')
1184
1185    def put_coro(self, coro_worker: AnyWorker, *args, **kwargs) -> 'CoroWrapperBase':
1186        if isinstance(coro_worker, EntityArgsHolder):
1187            coro_worker_real, args, kwargs = coro_worker.entity_args_kwargs()
1188            if isinstance(coro_worker, ExplicitWorker):
1189                coro_type: CoroType = coro_worker.coro_type
1190                coro_worker: EntityArgsHolderExplicit = EntityArgsHolderExplicit(coro_worker_real.worker, args, kwargs)
1191                return self.put_coro_fast(coro_type, coro_worker)
1192            else:
1193                return self.put_coro_fast(None, coro_worker)
1194        else:
1195            if isinstance(coro_worker, ExplicitWorker):
1196                return self.put_coro_fast(coro_worker.coro_type, coro_worker.worker, *args, **kwargs)
1197            else:
1198                return self.put_coro_fast(None, coro_worker, *args, **kwargs)
1199
1200    def register_service(self, service_type: ServiceType) -> bool:
1201        if service_type in self.services:
1202            return False
1203        else:
1204            service = service_type(self)
1205            self.services[service_type] = service
1206            self.live_services[service_type] = service
1207            return True
1208    
1209    def is_service_registered(self, service_type: ServiceType) -> bool:
1210        return service_type in self.services
1211    
1212    def unregister_service(self, service_type: ServiceType) -> bool:
1213        if service_type not in self.services:
1214            return True
1215        
1216        service = self.services[service_type]
1217        self.services.pop(service_type, None)
1218        self.live_services.pop(service_type, None)
1219        self.live_low_latency_services.pop(service_type, None)
1220        try:
1221            service.destroy()
1222        except:
1223            self.logger.exception(f'{datetime.now()} >> Service {service_type} failed to destroy')
1224            return False
1225        
1226        return True
1227    
1228    def destroy_services(self):
1229        for service_type in tuple(self.services):
1230            self.unregister_service(service_type)
1231    
1232    def get_service_by_type_impl(self, service_type: Type['Service']) -> 'Service':
1233        try:
1234            return self.services[service_type]
1235        except KeyError:
1236            self.register_service(service_type)
1237            return self.services[service_type]
1238    
1239    def get_service_by_type_impostor_impl(self, service_type: Type['Service']) -> 'Service':
1240        return self.get_service_by_type_impl(self.services_impostrors.get(service_type, service_type))
1241    
1242    def register_service_impostor(self, service_type: Type['Service'], service_impostor_type: Optional[Type['Service']]) -> Optional[Type['Service']]:
1243        if service_impostor_type is None:
1244            an_old_impostor: Optional[Type['Service']] = self.services_impostrors.pop(service_type, None)
1245        else:
1246            an_old_impostor = self.services_impostrors.get(service_type, None)
1247            self.services_impostrors[service_type] = service_impostor_type
1248        
1249        if self.services_impostrors:
1250            self.get_service_by_type = self.get_service_by_type_impostor_impl
1251            self.get_service_instance_fast = self.get_service_instance_fast_impostor_impl
1252            self.get_service_instance = self.get_service_instance_impostor_impl
1253            self.make_service_live_fast = self.make_service_live_fast_impostor_impl
1254            self.make_service_live = self.make_service_live_impostor_impl
1255            self.make_service_dead_fast = self.make_service_dead_fast_impostor_impl
1256            self.make_service_dead = self.make_service_dead_impostor_impl
1257        else:
1258            self.get_service_by_type = self.get_service_by_type_impl
1259            self.get_service_instance_fast = self.get_service_instance_fast_impl
1260            self.get_service_instance = self.get_service_instance_impl
1261            self.make_service_live_fast = self.make_service_live_fast_impl
1262            self.make_service_live = self.make_service_live_impl
1263            self.make_service_dead_fast = self.make_service_dead_fast_impl
1264            self.make_service_dead = self.make_service_dead_impl
1265        
1266        return an_old_impostor
1267
1268    def find_new_born_coroutine(self, coro_id: CoroID) -> int:
1269        for index, coro in enumerate(self.new_born_coroutines):
1270            if coro.coro_id == coro_id:
1271                return index
1272        
1273        return None
1274    
1275    def get_coro(self, coro_id: CoroID) -> Optional['CoroWrapperBase']:
1276        if coro_id in self.coroutines:
1277            return self.coroutines[coro_id]
1278        else:
1279            new_born_coro_index = self.find_new_born_coroutine(coro_id)
1280            if new_born_coro_index is None:
1281                return None
1282            
1283            return self.new_born_coroutines[new_born_coro_index]
1284    
1285    @staticmethod
1286    def purify_worker(worker: Union[Callable, 'GreenletWorkerWrapper']) -> Callable:
1287        if isinstance(worker, GreenletWorkerWrapper):
1288            return worker.worker
1289        else:
1290            return worker
1291
1292    def set_global_on_start_handlers(self, turn_on_global_on_start_handlers: bool):
1293        self.global_on_start_handlers_turned_on = turn_on_global_on_start_handlers
1294        if turn_on_global_on_start_handlers:
1295            self.execute_global_on_start_handlers = self._execute_global_on_start_handlers
1296        else:
1297            self.execute_global_on_start_handlers = self._fake_execute_global_on_start_handlers
1298
1299    def add_on_global_on_start_handler(self, handler: Callable):
1300        self.coro_on_start_handlers.add(handler)
1301
1302    def _execute_global_on_start_handlers(self, coro: 'CoroWrapperBase') -> bool:
1303        # executes before new born corotine execution
1304        # if at least one of the handlers will return False - coro will not be started at all and removed from the coroutines list
1305        result = True
1306        for handler in self.coro_on_start_handlers:
1307            result &= handler(coro)
1308        
1309        return result
1310    
1311    def _fake_execute_global_on_start_handlers(self, coro: 'CoroWrapperBase') -> bool:
1312        # executes before new born corotine execution
1313        # if at least one of the handlers will return False - coro will not be started at all and removed from the coroutines list
1314        return True
1315    
1316    
1317    def process_coro_exit_status(self, coro: 'CoroWrapperBase', coro_exit_status: Optional['CoroExitStatus']):
1318        try:
1319            if coro_exit_status is None:
1320                self._run_global_on_coro_del_handlers(coro)
1321            else:
1322                if coro_exit_status.exception and (not coro_exit_status.properly_handled) and (not self.coro_on_del_handlers) and (not self.suppress_coro_exceptions):
1323                    raise coro_exit_status.exception
1324
1325                coro_exit_status.properly_handled = self._run_global_on_coro_del_handlers(coro) or coro_exit_status.properly_handled
1326
1327                if coro_exit_status.exception and (not coro_exit_status.properly_handled):
1328                    if self.suppress_coro_exceptions:
1329                        # if __debug__: dlog(coro_exit_status.exception)
1330                        # dlog(coro_exit_status.exception)
1331                        self.log_exc(coro, coro_exit_status.exception)
1332                    else:
1333                        raise coro_exit_status.exception
1334        finally:
1335            coro.destroy()
1336            del coro
1337    
1338    def kill_coro(self, coro: 'CoroWrapperBase'):
1339        coro.request_close()
1340        coro_exit_status: 'CoroExitStatus' = coro()
1341        if not coro:
1342            self.process_coro_exit_status(coro, coro_exit_status)
1343    
1344    def throw_coro(self, coro: 'CoroWrapperBase', ex_type, ex_value=None, ex_traceback=None):
1345        # coro.request_close()
1346        coro.request_throw()  # TODO: Investigate why it raises an exception in the loop and shuts down the loop
1347        coro_exit_status: 'CoroExitStatus' = coro(ex_type, ex_value, ex_traceback)
1348        if not coro:
1349            self.process_coro_exit_status(coro, coro_exit_status)
1350    
1351    def forget_coro_by_id(self, coro: 'CoroWrapperBase') -> Optional['CoroWrapperBase']:
1352        return self.forget_coro_by_id(coro.coro_id)
1353    
1354    def find_coro_by_id(self, coro_id: CoroID) -> Tuple[Optional['CoroWrapperBase'], bool, Optional[int]]:
1355        coro = None
1356        if coro_id in self.coroutines:
1357            was_new_born: bool = False
1358            coro = self.coroutines[coro_id]
1359            return coro, was_new_born, None
1360        else:
1361            was_new_born = True
1362            new_born_index = self.find_new_born_coroutine(coro_id)
1363            if new_born_index is not None:
1364                coro = self.new_born_coroutines[new_born_index]
1365        
1366            return coro, was_new_born, new_born_index
1367
1368    def del_coro_by_id(self, coro_id: CoroID, was_new_born: bool, new_born_index: Optional[int] = None, request: Optional['Request'] = None):
1369        if was_new_born:
1370            del self.new_born_coroutines[new_born_index]
1371        else:
1372            self.coroutines.pop(coro_id, None)
1373            if isinstance(request, Request):
1374                try:
1375                    self.requests.remove(request)
1376                except ValueError:
1377                    pass
1378    
1379    def kill_coro_by_id(self, coro_id: CoroID) -> bool:
1380        if self.current_coro_interface is not None:
1381            raise RuntimeError('CoroScheduler.kill_coro_by_id() Must not be called from the coroutine')
1382        
1383        coro, was_new_born, new_born_index = self.find_coro_by_id(coro_id)
1384        if coro is not None:
1385            request: Optional[Request] = coro.last_result
1386            if was_new_born:
1387                self.process_coro_exit_status(coro, None)
1388            else:
1389                self.kill_coro(coro)
1390            
1391            self.del_coro_by_id(coro_id, was_new_born, new_born_index, request=request)
1392            return True
1393        
1394        return False
1395    
1396    def throw_coro_by_id(self, coro_id: CoroID, ex_type, ex_value=None, ex_traceback=None) -> bool:
1397        if self.current_coro_interface is not None:
1398            raise RuntimeError('CoroScheduler.throw_coro_by_id() Must not be called from the coroutine')
1399        
1400        coro, was_new_born, new_born_index = self.find_coro_by_id(coro_id)
1401        if coro is not None:
1402            request: Optional[Request] = coro.last_result
1403            if was_new_born:
1404                self.process_coro_exit_status(coro, None)
1405            else:
1406                self.throw_coro(coro, ex_type, ex_value, ex_traceback)
1407            
1408            if not coro:
1409                self.del_coro_by_id(coro_id, was_new_born, new_born_index, request=request)
1410
1411            return True
1412        
1413        return False
1414    
1415    def request_coro_throw_by_id(self, coro_id: CoroID) -> Optional['CoroWrapperBase']:
1416        coro, _, _ = self.find_coro_by_id(coro_id)
1417        if coro is not None:
1418            coro.request_throw()
1419        
1420        return coro
1421
1422    def request_coro_close_by_id(self, coro_id: CoroID) -> Optional['CoroWrapperBase']:
1423        coro, _, _ = self.find_coro_by_id(coro_id)
1424        if coro is not None:
1425            coro.request_close()
1426        
1427        return coro
1428    
1429    def _real_run_low_latency_services(self):
1430        for service_type, service in tuple(self.live_low_latency_services.items()):
1431            if service.in_work():
1432                results = service.iteration()
1433                if results:
1434                    # if __debug__:
1435                    #     for result in results:
1436                    #         if __debug__: dlog(f'λ <<< {repr(result)}')
1437
1438                    self.responses.extend(results)
1439    
1440    def _fake_run_low_latency_services(self):
1441        pass
1442    
1443    def turn_on_embedded_mode(self, turn_on: bool = False):
1444        if turn_on:
1445            self.run_low_latency_services = self._real_run_low_latency_services
1446        else:
1447            self.run_low_latency_services = self._fake_run_low_latency_services
1448
1449    def add_global_on_coro_del_handler(self, callback: OnCoroDelHandler):
1450        self.coro_on_del_handlers.add(callback)
1451
1452    def _run_global_on_coro_del_handlers(self, coro: 'CoroWrapperBase') -> bool:
1453        if not self.coro_on_del_handlers:
1454            return False
1455
1456        exception_properly_handled = False
1457        for handler in self.coro_on_del_handlers:
1458            exception_properly_handled = handler(coro) or exception_properly_handled
1459        return exception_properly_handled
1460
1461    def restore_global_state(self):
1462        """
1463        Must be run immediately after `await ...` from inside Coro or Service when running from inside external
1464        async loop
1465        :return:
1466        """
1467        _current_coro_scheduler.scheduler = self
1468
1469    @staticmethod
1470    def current_loop() -> Optional['CoroSchedulerType']:
1471        return _current_coro_scheduler.scheduler
1472
1473    def current_interface(self) -> Optional['Interface']:
1474        return self.current_coro_interface
1475
1476    def current_wrapper(self) -> Optional['CoroWrapperBase']:
1477        return self.current_coro_wrapper
1478    
1479    def destroy_all_coroutines(self, ex_type=None, ex_value=None, ex_traceback=None):
1480        while self.new_born_coroutines or self.coroutines:
1481            new_born_coroutines_buff = self.new_born_coroutines
1482            # self.new_born_coroutines = type(new_born_coroutines_buff)()
1483            self.new_born_coroutines = list()
1484            for coro in new_born_coroutines_buff:
1485                coro: 'CoroWrapperBase' = coro
1486                coro_id: CoroID = coro.coro_id
1487                if (ex_type is not None) or (ex_value is not None):
1488                    self.throw_coro_by_id(coro_id, ex_type, ex_value, ex_traceback)
1489    
1490                if coro:
1491                    self.kill_coro_by_id(coro_id)
1492
1493            coroutines_buff = self.coroutines
1494            # self.coroutines = type(coroutines_buff)()
1495            self.coroutines = type(coroutines_buff)()
1496            for coro in coroutines_buff.values():
1497                coro: 'CoroWrapperBase' = coro
1498                coro_id: CoroID = coro.coro_id
1499                if (ex_type is not None) or (ex_value is not None):
1500                    self.throw_coro_by_id(coro_id, ex_type, ex_value, ex_traceback)
1501                
1502                if coro:
1503                    self.kill_coro_by_id(coro_id)
1504    
1505    def destroy_new_born_coroutines(self, ex_type=None, ex_value=None, ex_traceback=None):
1506        while self.new_born_coroutines:
1507            new_born_coroutines_buff = self.new_born_coroutines
1508            # self.new_born_coroutines = type(new_born_coroutines_buff)()
1509            self.new_born_coroutines = list()
1510            for coro in new_born_coroutines_buff:
1511                coro: 'CoroWrapperBase' = coro
1512                coro_id: CoroID = coro.coro_id
1513                if (ex_type is not None) or (ex_value is not None):
1514                    self.throw_coro_by_id(coro_id, ex_type, ex_value, ex_traceback)
1515    
1516                if coro:
1517                    self.kill_coro_by_id(coro_id)
1518    
1519    def destroy_coroutines(self, ex_type=None, ex_value=None, ex_traceback=None):
1520        while self.coroutines:
1521            coroutines_buff = self.coroutines
1522            # self.coroutines = type(coroutines_buff)()
1523            self.coroutines = dict()
1524            coroutines_buff_values = coroutines_buff.values()
1525            for coro in coroutines_buff_values:
1526                coro: 'CoroWrapperBase' = coro
1527                coro_id: CoroID = coro.coro_id
1528                if (ex_type is not None) or (ex_value is not None):
1529                    self.throw_coro_by_id(coro_id, ex_type, ex_value, ex_traceback)
1530    
1531                if coro:
1532                    self.kill_coro_by_id(coro_id)
1533    
1534    def destroy(self):
1535        if self.need_to_log_loop_start_and_end:
1536            self.logger.info(f'{datetime.now()} >> {type(self).__name__} destroy...')
1537        
1538        if not self._destroyed:
1539            def create_entity_during_destroy(*args, **kwargs):
1540                raise CoroSchedulerIsCurrentlyDestroingError
1541            
1542            put_coro_fast_buf = self.put_coro_fast
1543            self.put_coro_fast = create_entity_during_destroy
1544            register_service_bak = self.register_service
1545            self.register_service = create_entity_during_destroy
1546
1547            try:
1548                while self.new_born_coroutines or self.coroutines:
1549                    new_born_coroutines_buff = self.new_born_coroutines
1550                    # self.new_born_coroutines = type(new_born_coroutines_buff)()
1551                    self.new_born_coroutines = list()
1552                    for coro in new_born_coroutines_buff:
1553                        coro: 'CoroWrapperBase' = coro
1554                        coro_id: CoroID = coro.coro_id
1555                        repr_coro: str = repr(coro)
1556                        try:
1557                            self.throw_coro_by_id(coro_id, CoroSchedulerDestroyException)
1558                        except CoroSchedulerIsCurrentlyDestroingError:
1559                            self.logger.warning(f'{datetime.now()} >> Unhandled `CoroSchedulerIsCurrentlyDestroingError` exception during throw CoroSchedulerDestroyException to new born coroutine {repr_coro}')
1560                            pass
1561                        
1562                        try:
1563                            if coro:
1564                                self.kill_coro_by_id(coro_id)
1565                        except CoroSchedulerIsCurrentlyDestroingError:
1566                            self.logger.warning(f'{datetime.now()} >> Unhandled `CoroSchedulerIsCurrentlyDestroingError` exception during kill new born coroutine {repr_coro}')
1567                            pass
1568
1569                    coroutines_buff = self.coroutines
1570                    # self.coroutines = type(coroutines_buff)()
1571                    self.coroutines = dict()
1572                    for coro in coroutines_buff.values():
1573                        coro: 'CoroWrapperBase' = coro
1574                        coro_id: CoroID = coro.coro_id
1575                        repr_coro: str = repr(coro)
1576                        try:
1577                            self.throw_coro_by_id(coro_id, CoroSchedulerDestroyException)
1578                        except CoroSchedulerIsCurrentlyDestroingError:
1579                            self.logger.warning(f'{datetime.now()} >> Unhandled `CoroSchedulerIsCurrentlyDestroingError` exception during throw CoroSchedulerDestroyException to coroutine {repr_coro}')
1580                            pass
1581
1582                        try:
1583                            if coro:
1584                                self.kill_coro_by_id(coro_id)
1585                        except CoroSchedulerIsCurrentlyDestroingError:
1586                            self.logger.warning(f'{datetime.now()} >> Unhandled `CoroSchedulerIsCurrentlyDestroingError` exception during kill coroutine {repr_coro}')
1587                            pass
1588
1589                    # self.requests = type(self.requests)()
1590                    self.requests = list()
1591                    # self.responses = type(self.responses)()
1592                    self.responses = list()
1593                
1594                for service_type in tuple(self.services):
1595                    try:
1596                        self.unregister_service(service_type)
1597                    except CoroSchedulerIsCurrentlyDestroingError:
1598                        self.logger.warning(f'{datetime.now()} >> Unhandled `CoroSchedulerIsCurrentlyDestroingError` exception during destroy service "{service_type}"')
1599                        pass
1600
1601                # self.requests = type(self.requests)()
1602                self.requests = list()
1603                # self.responses = type(self.responses)()
1604                self.responses = list()
1605            finally:
1606                self.put_coro_fast = put_coro_fast_buf
1607                self.register_service = register_service_bak
1608                self._destroyed = True
1609                for on_destroyed_handler in self.on_destroyed_handlers:
1610                    on_destroyed_handler()
1611
1612                if self.need_to_log_loop_start_and_end:
1613                    self.logger.info(f'{datetime.now()} >> {type(self).__name__} destroyed.')
1614                
1615                self.logger.removeHandler(self.logger_stream_handler)
1616
1617    
1618    def _update_in_work(self):
1619        # self._in_work = self.new_born_coroutines or self.coroutines or self.services_in_work
1620        self._in_work = (self.foreground_coro_num > 0) or self.services_in_foreground_work
1621        return self._in_work
1622
1623    def in_work(self):
1624        in_work_result = self._in_work
1625        return self._update_in_work() if in_work_result is None else in_work_result
1626
1627    def _update_is_awake(self):
1628        self._is_awake = self.new_born_coroutines or self.coroutines or self.live_services or self.live_low_latency_services
1629        return self._is_awake
1630
1631    def is_awake(self):
1632        is_awake_result = self._is_awake
1633        return self._update_is_awake() if is_awake_result is None else is_awake_result
1634
1635    def is_idle(self) -> bool:
1636        # return (self.next_event_after() is not None) and (not self.new_born_coroutines) and (not self.services_in_active_state) and (not self.responses)
1637        return (not self.new_born_coroutines) and (not self.services_in_active_state) and (0 == (len(self.responses) - self.idle_managers_num))
1638
1639    def next_event_after(self) -> Optional[Union[float, int]]:
1640        return self.time_left_before_next_event
1641
1642    def _loop_imp(self):
1643        try:
1644            next_event_after = None
1645            # while self.in_work() or (next_event_after is not None):
1646            while self.in_work():
1647                self._iteration_imp()
1648                next_event_after = self.next_event_after()
1649                # if (next_event_after is not None) and (not self.new_born_coroutines) and (not self.services_in_active_state) and (not self.responses):
1650                #     if self.use_internal_sleep or (not self.on_idle_handlers):
1651                #         sleep(next_event_after)
1652                #     else:
1653                #         for handler in self.on_idle_handlers:
1654                #             handler(next_event_after)
1655                # # else:
1656                # #     print(self.services_in_active_state_list)
1657
1658                if self.is_idle():
1659                    if self.use_internal_sleep:
1660                        if next_event_after is None:
1661                            sleep(high_cpu_utilisation_mode=self.high_cpu_utilisation_mode)
1662                        else:
1663                            sleep(next_event_after, high_cpu_utilisation_mode=self.high_cpu_utilisation_mode)
1664
1665                    for handler in self.on_idle_handlers:
1666                        handler(next_event_after)
1667        except CoroSchedulerDestroyRequestedException:
1668            pass
1669        finally:
1670            self.destroy()
1671    
1672    def _iteration_imp(self):
1673        current_coro_scheduler_buff = _current_coro_scheduler.scheduler
1674        _current_coro_scheduler.scheduler = self
1675        
1676        self.in_iteration = True
1677        
1678        cpu_clock_cycles_start_time = cpu_clock_cycles()
1679        # minus_delta_time = 0
1680        try:
1681            self.context_switches += len(self.new_born_coroutines) + len(self.responses)
1682            self.loop_iteration_start_time = self.get_loop_iteration_start_time()
1683
1684            if not self.keep_coro_execution_time_between_iterations:
1685                # self.coro_execution_time = type(self.coro_execution_time)()
1686                self.coro_execution_time = dict()
1687            if not self.keep_coro_longest_execution_time_between_iterations:
1688                # self.coro_longest_execution_time = type(self.coro_longest_execution_time)()
1689                self.coro_longest_execution_time = dict()
1690            if not self.keep_coro_workers_history_between_iterations:
1691                # self.coro_workers_history = type(self.coro_workers_history)()
1692                self.coro_workers_history = dict()
1693            if not self.keep_coro_full_history_between_iterations:
1694                # self.coro_full_history = type(self.coro_full_history)()
1695                self.coro_full_history = dict()
1696
1697            # minus_start_time = cpu_clock_cycles()
1698            self.run_low_latency_services()
1699            # minus_delta_time += cpu_clock_cycles() - minus_start_time
1700
1701            new_born_coroutines_buff = self.new_born_coroutines
1702            # self.new_born_coroutines = type(new_born_coroutines_buff)()
1703
1704            # TODO: we need to give first coro a list of all coros and an index (0 at the moment)
1705            # coro will make required work, find next Greenlet coro in the list and will swithch
1706            # to it given same list and an updated index. Next coro will do the same.
1707            # then an every coro returns and as result we will return here. After that,
1708            # loop will process all Asyncio coroutines in a normal maner
1709            # if self._coroutines_can_switch_directly:
1710            #     pass
1711            
1712            self.new_born_coroutines = list()
1713            self._is_current_coro_was_new_born = True
1714            for coro in new_born_coroutines_buff:
1715                coro: CoroWrapperBase = coro
1716                if self.execute_global_on_start_handlers(coro):
1717                    self.current_coro_start_time = self.get_coro_start_time()
1718                    self.current_coro_wrapper = coro
1719                    # minus_start_time = cpu_clock_cycles()
1720                    coro_exit_status: CoroExitStatus = coro.init(self.root_coro)
1721                    # minus_delta_time += cpu_clock_cycles() - minus_start_time
1722                    self.current_coro_wrapper = None
1723                    coro_execution_piece_delta_time = self.get_coro_start_time() - self.current_coro_start_time
1724                    self.current_coro_start_time = None
1725                    coro_id = coro.coro_id
1726                    if coro_id not in self.coro_execution_time:
1727                        self.coro_execution_time[coro_id] = 0
1728
1729                    self.coro_execution_time[coro_id] += coro_execution_piece_delta_time
1730                    if coro_id not in self.coro_longest_execution_time:
1731                        self.coro_longest_execution_time[coro_id] = 0
1732
1733                    self.coro_longest_execution_time[coro_id] = max(self.coro_longest_execution_time[coro_id], coro_execution_piece_delta_time)
1734                    if coro:
1735                        coro_last_result = coro.last_result
1736                        if not isinstance(coro_last_result, Request):
1737                            if self.on_wrong_request is None:
1738                                # minus_start_time = cpu_clock_cycles()
1739                                self.logger.warning(f'{datetime.now()} >> Wrong request {repr(coro_last_result)} of type {type(coro_last_result)} from coroutine {repr(coro)}')
1740                                self.kill_coro_by_id(coro_id)
1741                                # minus_delta_time += cpu_clock_cycles() - minus_start_time
1742                                continue
1743                            else:
1744                                # minus_start_time = cpu_clock_cycles()
1745                                coro_last_result = self.on_wrong_request(coro, coro_last_result)
1746                                # minus_delta_time += cpu_clock_cycles() - minus_start_time
1747                        
1748                        self.requests.append(coro_last_result)
1749                        self.coroutines[coro_id] = coro
1750                        continue
1751                    else:
1752                        # minus_start_time = cpu_clock_cycles()
1753                        self.process_coro_exit_status(coro, coro_exit_status)
1754                        # minus_delta_time += cpu_clock_cycles() - minus_start_time
1755
1756            responses_buff = self.responses
1757            # self.responses = type(responses_buff)()
1758            
1759            # TODO: we need to give first coro a list of all coros and an index (0 at the moment)
1760            # coro will make required work, find next Greenlet coro in the list and will swithch
1761            # to it given same list and an updated index. Next coro will do the same.
1762            # then an every coro returns and as result we will return here. After that,
1763            # loop will process all Asyncio coroutines in a normal maner
1764            # if self._coroutines_can_switch_directly:
1765            #     pass
1766            
1767            self.responses = list()
1768            self._is_current_coro_was_new_born = False
1769            for response in responses_buff:
1770                coro_id = response.coro_id
1771                if coro_id not in self.coroutines:
1772                    if not self.suppress_warnings_about_responses_to_not_existant_coroutines:
1773                        self.logger.warning(f'{datetime.now()} >> Coroutine {coro_id} has a response but does not exists: {repr(response)}')
1774                    
1775                    continue
1776                
1777                coro = self.coroutines[coro_id]
1778
1779                self.current_coro_start_time = self.get_coro_start_time()
1780                self.current_coro_wrapper = coro
1781                if isinstance(response, DirectResponse):
1782                    response = response.response
1783                
1784                # minus_start_time = cpu_clock_cycles()
1785                coro_exit_status: CoroExitStatus = coro(response)
1786                # minus_delta_time += cpu_clock_cycles() - minus_start_time
1787                self.current_coro_wrapper = None
1788                coro_execution_piece_delta_time = self.get_coro_start_time() - self.current_coro_start_time
1789                self.current_coro_start_time = None
1790                coro_id = coro.coro_id
1791                if coro_id not in self.coro_execution_time:
1792                    self.coro_execution_time[coro_id] = 0
1793
1794                self.coro_execution_time[coro_id] += coro_execution_piece_delta_time
1795                if coro_id not in self.coro_longest_execution_time:
1796                    self.coro_longest_execution_time[coro_id] = 0
1797
1798                self.coro_longest_execution_time[coro_id] = max(self.coro_longest_execution_time[coro_id], coro_execution_piece_delta_time)
1799                if coro:
1800                    coro_last_result = coro.last_result
1801                    if not isinstance(coro_last_result, Request):
1802                        if self.on_wrong_request is None:
1803                            # minus_start_time = cpu_clock_cycles()
1804                            self.logger.warning(f'{datetime.now()} >> Wrong request {repr(coro_last_result)} of type {type(coro_last_result)} from coroutine {repr(coro)}')
1805                            self.kill_coro_by_id(coro_id)
1806                            # minus_delta_time += cpu_clock_cycles() - minus_start_time
1807                            continue
1808                        else:
1809                            # minus_start_time = cpu_clock_cycles()
1810                            coro_last_result = self.on_wrong_request(coro, coro_last_result)
1811                            # minus_delta_time += cpu_clock_cycles() - minus_start_time
1812                    
1813                    self.requests.append(coro_last_result)
1814                    continue
1815                else:
1816                    del self.coroutines[coro_id]
1817                    # minus_start_time = cpu_clock_cycles()
1818                    self.process_coro_exit_status(coro, coro_exit_status)
1819                    # minus_delta_time += cpu_clock_cycles() - minus_start_time
1820
1821            self._is_current_coro_was_new_born = None
1822            requests_buff = self.requests
1823            # self.requests = type(requests_buff)()
1824            self.requests = list()
1825            for request in requests_buff:
1826                # if __debug__: dlog(f'λ >>> {repr(request)}')
1827                service_type = request.service_type
1828                service: 'Service' = self.get_service_by_type(service_type)
1829                # minus_start_time = cpu_clock_cycles()
1830                result = service.put_task(request)
1831                # minus_delta_time += cpu_clock_cycles() - minus_start_time
1832                if result is not None:
1833                    # if __debug__: dlog(f'λ <<< {repr(result)}')
1834                    self.responses.append(result)
1835
1836            self.services_in_work = 0
1837            self.services_in_foreground_work = 0
1838            self.services_in_active_state = 0
1839            # self.services_in_active_state_list = list()
1840            self.time_left_before_next_event = None
1841            for service_type, service in tuple(self.live_services.items()):
1842                # minus_start_time = cpu_clock_cycles()
1843                in_work_result = service.in_work()
1844                # minus_delta_time += cpu_clock_cycles() - minus_start_time
1845                if in_work_result:
1846                    self.services_in_work += 1
1847                    self.services_in_active_state += 1
1848                    # minus_start_time = cpu_clock_cycles()
1849                    results = service.iteration()
1850                    # minus_delta_time += cpu_clock_cycles() - minus_start_time
1851                    # minus_start_time = cpu_clock_cycles()
1852                    in_work_result = service.in_work()
1853                    # minus_delta_time += cpu_clock_cycles() - minus_start_time
1854                    if in_work_result:
1855                        self.services_in_foreground_work += 1 if service.in_forground_work() else 0
1856                        # minus_start_time = cpu_clock_cycles()
1857                        has_planned_events, time_left_before_next_event = service.time_left_before_next_event()
1858                        # minus_delta_time += cpu_clock_cycles() - minus_start_time
1859                        if has_planned_events and (time_left_before_next_event is not None):
1860                            self.services_in_active_state -= 1
1861                            if self.time_left_before_next_event is None:
1862                                self.time_left_before_next_event = time_left_before_next_event
1863                            elif time_left_before_next_event < self.time_left_before_next_event:
1864                                self.time_left_before_next_event = time_left_before_next_event
1865                        # else:
1866                        #     self.services_in_active_state_list.append(service_type)
1867                    else:
1868                        self.services_in_work -= 1
1869                        self.services_in_active_state -= 1
1870                        # self.services_in_active_state_list.append(service_type)
1871                    
1872                    if results:
1873                        # if __debug__:
1874                        #     for result in results:
1875                        #         if __debug__: dlog(f'λ <<< {repr(result)}')
1876
1877                        self.responses.extend(results)
1878        except CoroSchedulerDestroyRequestedException:
1879            raise
1880        except:
1881            # if __debug__: dlog('Loop Exception')
1882            self.logger.exception(f'{datetime.now()} >> Loop Exception')
1883            raise
1884        finally:
1885            self._update_in_work()
1886            self._update_is_awake()
1887            self.loop_iteration_delta_time = self.get_loop_iteration_start_time() - self.loop_iteration_start_time
1888            self.loop_cpu_clock_cycles = cpu_clock_cycles() - cpu_clock_cycles_start_time
1889            # self.sliding_window.append(cpu_clock_cycles() - cpu_clock_cycles_start_time - minus_delta_time)
1890            # self.sliding_window.append(minus_delta_time)
1891            self.iteration_index += 1
1892            # print()
1893            _current_coro_scheduler.scheduler = current_coro_scheduler_buff
1894            self.in_iteration = False
1895
1896
1897class CoroSchedulerGreenlet(CoroSchedulerBase):
1898    def __init__(self, logger: Optional[logging.Logger] = None):
1899        super().__init__(logger)
1900        self.root_coro_loop = greenlet(self._loop_imp)            # type: Coro
1901        # self.root_coro_iteration = greenlet(self._iteration_imp)  # type: Coro
1902        self.root_coro_iteration = greenlet(self._iter_wrapper)  # type: Coro
1903        # self._root_coro = None                                     # type: Optional[Coro]
1904        self.root_coro = None                                     # type: Optional[Coro]
1905        self._coroutines_can_switch_directly = True
1906    
1907    def _new_coro_type_normalizer(self, coro_type: Optional[CoroType]) -> CoroType:
1908        return coro_type or CoroType.auto
1909
1910    def loop(self):
1911        if self.need_to_log_loop_start_and_end:
1912            self.logger.info(f'{datetime.now()} >> {type(self).__name__} loop started...')
1913        
1914        self.root_coro = self.root_coro_loop
1915
1916        # if __debug__: dlog('Switch to root_coro')
1917        self.root_coro.switch()
1918        # if __debug__: dlog('Switch from root_coro')
1919        self.root_coro = None
1920        self.root_coro_loop = greenlet(self._loop_imp)
1921    
1922    def iteration(self):
1923        # global _debug_log_counter
1924        # last_debug_log_counter = _debug_log_counter
1925        # if __debug__: dlog(f'^ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}')
1926        
1927        self.root_coro = self.root_coro_iteration
1928        in_work = False
1929        try:
1930            # if __debug__: dlog('Switch to root_coro (iteration)')
1931            in_work = self.root_coro.switch(True)
1932            # if __debug__: dlog('Switch from root_coro (iteration)')
1933        except GreenletExit:
1934            self.stop_iteration()
1935            # if __debug__: dlog('Switch from root_coro (GreenletExit)')
1936        except:
1937            self.stop_iteration()
1938            # if __debug__: dlog('Switch from root_coro (Exception):')
1939            # if __debug__: dlog(sys.exc_info())
1940            raise
1941        finally:
1942            self.root_coro = None
1943            # if __debug__:
1944            #     if last_debug_log_counter < _debug_log_counter:
1945            #         if __debug__: dlog(f'_ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}')
1946        
1947        return in_work
1948    
1949    def stop_iteration(self):
1950        try:
1951            in_work = self.root_coro.switch(False)
1952        except GreenletExit:
1953            self.stop_iteration()
1954            # if __debug__: dlog('Stop root_coro (GreenletExit)')
1955        except:
1956            self.stop_iteration()
1957            # if __debug__: dlog('Stop root_coro (Exception):')
1958            # if __debug__: dlog(sys.exc_info())
1959            raise
1960        finally:
1961            self.root_coro = None
1962            self.root_coro_iteration = greenlet(self._iter_wrapper)
1963    
1964    def _iter_wrapper(self, proceed: bool=True):
1965        try:
1966            while proceed:
1967                self._iteration_imp()
1968                proceed = greenlet.getcurrent().parent.switch(self.in_work())
1969        except CoroSchedulerDestroyRequestedException:
1970            pass
1971        finally:
1972            self.destroy()
1973
1974
1975class CoroSchedulerAwaitable(CoroSchedulerBase):
1976    def __init__(self, logger: Optional[logging.Logger] = None):
1977        super().__init__(logger)
1978        self.root_coro = None                                     # type: Optional[Coro]
1979    
1980    def _new_coro_type_normalizer(self, coro_type: Optional[CoroType]) -> CoroType:
1981        return CoroType.awaitable
1982
1983    def loop(self):
1984        if self.need_to_log_loop_start_and_end:
1985            self.logger.info(f'{datetime.now()} >> {type(self).__name__} loop started...')
1986        
1987        self._loop_imp()
1988    
1989    def iteration(self):
1990        # global _debug_log_counter
1991        # last_debug_log_counter = _debug_log_counter
1992        # if __debug__: dlog(f'^ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}')
1993        
1994        need_to_destroy = False
1995        try:
1996            self._iteration_imp()
1997            return self.in_work()
1998        except CoroSchedulerDestroyRequestedException:
1999            need_to_destroy = True
2000        except:
2001            need_to_destroy = True
2002            raise
2003        finally:
2004            # if __debug__:
2005            #     if last_debug_log_counter < _debug_log_counter:
2006            #         if __debug__: dlog(f'_ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}')
2007            
2008            if need_to_destroy:
2009                self.destroy()
2010
2011
2012CoroScheduler = CoroSchedulerGreenlet if greenlet_awailable else CoroSchedulerAwaitable
2013
2014
2015CoroSchedulerType: TypeAlias = Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable]
2016
2017
2018def current_interface() -> Optional['Interface']:
2019    return current_coro_scheduler().current_interface()
2020
2021
2022def execute_coro(coro_worker: AnyWorker, *args, **kwargs) -> Any:
2023    return coro_worker(current_interface(), *args, **kwargs)
2024
2025
2026exec_coro = execute_coro
2027ecoro = execute_coro
2028
2029
2030async def aexecute_coro(coro_worker: AnyWorker, *args, **kwargs) -> Any:
2031    return await coro_worker(current_interface(), *args, **kwargs)
2032
2033
2034aexec_coro = aexecute_coro
2035aecoro = aexecute_coro
2036
2037
2038@contextmanager
2039def around_await():
2040    """
2041    It is very bad idea to await from inside the greenlet since it might lead to problems with stack
2042    state in an outside awaitable code and it's loop.
2043    Must be run around `await ...` from inside Coro or Service when running from inside external async loop
2044    :return:
2045    """
2046    loop = _current_coro_scheduler.scheduler
2047    try:
2048        yield
2049    finally:
2050        _current_coro_scheduler.scheduler = loop
2051
2052
2053class Request:
2054    def __init__(self, coro: 'CoroWrapperBase', service_type: ServiceType, *args, **kwargs):
2055        self.coro = coro
2056        self.coro_id = coro.coro_id
2057        self.service_type = service_type
2058        self.args = args
2059        self.kwargs = kwargs
2060    
2061    def __repr__(self):
2062        return f'<{self.__class__.__name__}(coro: {self.coro}, coro_id: {self.coro_id}, service_type: {self.service_type}, args: {self.args}, kwargs: {self.kwargs})>'
2063
2064
2065class Response:
2066    def __init__(self, coro_id: CoroID, service_type: ServiceType, response: Any, exception: Optional[BaseException]=None):
2067        self.coro_id = coro_id
2068        self.service_type = service_type
2069        self.response = response
2070        self.exception = exception
2071
2072    def __call__(self) -> Any:
2073        if self.exception:
2074            raise self.exception
2075        return self.response
2076    
2077    def __repr__(self):
2078        return f'<{self.__class__.__name__}(coro_id: {self.coro_id}, service_type: {self.service_type}, response: {self.response}, exception: {self.exception})>'
2079
2080
2081class DirectResponse(Response):
2082    pass
2083
2084
2085class CoroExitStatus:
2086    def __init__(self, exception, properly_handled):
2087        self.exception = exception
2088        self.properly_handled = properly_handled
2089
2090
2091class CoroWrapperBase:
2092    def __init__(self, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs):
2093        if isinstance(worker, EntityArgsHolder):
2094            worker, args, kwargs = worker.entity_args_kwargs()
2095        
2096        self.worker = worker               # type: Worker
2097        self.init_args = args              # type: Tuple[Any, ...]
2098        self.init_kwargs = kwargs          # type: Dict
2099        self.coro_id: CoroID = coro_id             # type: CoroID
2100        self.loop: CoroSchedulerBase = loop                   # type: CoroSchedulerBase
2101        self.coro = None                   # type: Optional[Coro]
2102        self.parent_coro = None            # type: Optional[Coro]
2103        self.interface = None              # type: Optional[Interface]
2104        self.last_result = None            # type: Optional[Union[Request, Any]] # return value can be any
2105        self.exception = None              # type: Optional[Exception] # If there was an exception
2106        self.coro_on_del_handlers = set()  # type: Set[Callable]
2107        self._make_coro_method = self._raise_not_implemented_error  # type: Callable
2108        self._make_interface = self._raise_not_implemented_error  # type: Callable
2109        self._init_method = self._raise_not_implemented_error  # type: Callable
2110        self._call_method = self._raise_not_implemented_error  # type: Callable
2111        self._throw_method = self._raise_not_implemented_error  # type: Callable
2112        self._close_method = self._raise_not_implemented_error  # type: Callable
2113        self._current_call_method = self._call_method  # type: Callable
2114        self._is_background_coro: bool = False
2115        self.loop.foreground_coro_num += 1
2116    
2117    def _travers_through_coro_on_del_handlers(self, coro_exit_status: 'CoroExitStatus'):
2118        if not self.coro_on_del_handlers:
2119            return coro_exit_status
2120
2121        exception_properly_handled = coro_exit_status.properly_handled
2122        for handler in self.coro_on_del_handlers:
2123            exception_properly_handled = handler(self) or exception_properly_handled
2124        coro_exit_status.properly_handled = exception_properly_handled
2125        return coro_exit_status
2126
2127    def init(self, parent_coro: Optional[Coro] = None) -> Union[None, 'CoroExitStatus']:
2128        self.parent_coro = parent_coro
2129        self.coro = self._make_coro_method()
2130        self.interface = self._make_interface()
2131        current_coro_interface_buff = self.loop.current_coro_interface
2132        self.loop.current_coro_interface = self.interface
2133        try:
2134            self._init_method(self.init_args, self.init_kwargs)
2135            if self:
2136                return None
2137            else:
2138                # if __debug__: dlog(f'LAST_RESULT. Type: {type(self.last_result)}; Value: {self.last_result}')
2139                return self._travers_through_coro_on_del_handlers(CoroExitStatus(None, True))
2140        except:
2141            self.exception = get_exception()
2142            if not self.coro_on_del_handlers:
2143                return CoroExitStatus(self.exception, False)
2144            return self._travers_through_coro_on_del_handlers(CoroExitStatus(self.exception, False))
2145        finally:
2146            self.loop.current_coro_interface = current_coro_interface_buff
2147
2148    def __call__(self, *args, **kwargs) -> Union[None, 'CoroExitStatus']:
2149        current_coro_interface_buff = self.loop.current_coro_interface
2150        self.loop.current_coro_interface = self.interface
2151        try:
2152            self._current_call_method(*args, **kwargs)
2153            if self:
2154                return None
2155            else:
2156                # if __debug__: dlog(f'LAST_RESULT. Type: {type(self.last_result)}; Value: {self.last_result}')
2157                return self._travers_through_coro_on_del_handlers(CoroExitStatus(None, True))
2158        except:
2159            self.exception = get_exception()
2160            if not self.coro_on_del_handlers:
2161                return CoroExitStatus(self.exception, False)
2162            return self._travers_through_coro_on_del_handlers(CoroExitStatus(self.exception, False))
2163        finally:
2164            self.loop.current_coro_interface = current_coro_interface_buff
2165    
2166    def __bool__(self) -> bool:
2167        raise NotImplementedError
2168
2169    def add_on_coro_del_handler(self, callback: OnCoroDelHandler):
2170        self.coro_on_del_handlers.add(callback)
2171    
2172    def _raise_not_implemented_error(self, *args, **kwargs):
2173        pass
2174        raise NotImplementedError
2175        return self._raise_not_implemented_error  # Suppressing lint error
2176    
2177    def _current_throw_method_helper(self, *args, **kwargs):
2178        self._throw_method(*args, **kwargs)
2179        self._current_call_method = self._call_method
2180    
2181    def request_throw(self) -> Any:
2182        self._current_call_method = self._current_throw_method_helper
2183    
2184    def _current_close_method_helper(self, *args, **kwargs):
2185        self._close_method()
2186        self._current_call_method = self._call_method
2187    
2188    def request_close(self) -> Any:
2189        self._current_call_method = self._current_close_method_helper
2190    
2191    @property
2192    def is_background_coro(self):
2193        return self._is_background_coro
2194    
2195    @is_background_coro.setter
2196    def is_background_coro(self, value: bool):
2197        if value:
2198            if not self._is_background_coro:
2199                self.loop.foreground_coro_num -= 1
2200        else:
2201            if self._is_background_coro:
2202                self.loop.foreground_coro_num += 1
2203        
2204        self._is_background_coro = value
2205
2206    def destroy(self):
2207        if self.interface is not None:
2208            if isinstance(self.interface, Interface):
2209                self.interface.destroy()
2210        
2211        if not self.is_background_coro:
2212            self.loop.foreground_coro_num -= 1
2213        
2214        self._is_background_coro = None
2215        self.init_args = None
2216        self.init_kwargs = None
2217        self.coro_id = None
2218        self.worker = None
2219        self.loop = None
2220        self.coro = None
2221        self.parent_coro = None
2222        self.interface = None
2223        self.last_result = None
2224        self.exception = None
2225        self.coro_on_del_handlers = None
2226        self._make_coro_method = None
2227        self._make_interface = None
2228        self._init_method = None
2229        self._call_method = None
2230        self._throw_method = None
2231        self._close_method = None
2232        self._current_call_method = None
2233
2234
2235class CoroWrapperGreenlet(CoroWrapperBase):
2236    def __init__(self, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs):
2237        super(CoroWrapperGreenlet, self).__init__(loop, coro_id, worker, *args, **kwargs)
2238        self._make_coro_method = self._make_coro_method_imp
2239        self._make_interface = self._make_interface_imp
2240        self._init_method = self._init_method_imp
2241        self._call_method = self._call_method_imp
2242        self._throw_method = self._throw_method_imp  # type: Callable
2243        self._close_method = self._close_method_imp  # type: Callable
2244        self._current_call_method = self._call_method  # type: Callable
2245    
2246    def _make_coro_method_imp(self):
2247        return greenlet(self.worker, self.parent_coro)
2248    
2249    def _make_interface_imp(self):
2250        return InterfaceGreenlet(self.loop, self)
2251
2252    def _init_method_imp(self, init_args, init_kwargs):
2253        try:
2254            # if __debug__: dlog(f'λ => (init): {func_info(self.worker.worker)}')
2255            self.last_result = self.coro.switch(self.interface, *init_args, **init_kwargs)  # TODO: wrong
2256            # if __debug__: dlog(f'λ <= (init): {repr(self.worker.worker)}')
2257        except GreenletExit:
2258            # if __debug__: dlog(f'λ <= (init; GreenletExit): {repr(self.worker.worker)}')
2259            self.last_result = self.interface.result
2260        except:
2261            # if __debug__: dlog(f'λ <= (init; Exception): {repr(self.worker.worker)}')
2262            self.last_result = None
2263            raise
2264
2265    def _call_method_imp(self, *args, **kwargs):
2266        try:
2267            # if __debug__: dlog(f'λ => (call): {func_info(self.worker.worker)}')
2268            self.last_result = self.coro.switch(*args, **kwargs)  # TODO: wrong
2269            # if __debug__: dlog(f'λ <= (call): {repr(self.worker.worker)}')
2270        except GreenletExit:
2271            # if __debug__: dlog(f'λ <= (init; GreenletExit): {repr(self.worker.worker)}')
2272            self.last_result = self.interface.result
2273        except:
2274            # if __debug__: dlog(f'λ <= (init; Exception): {repr(self.worker.worker)}')
2275            self.last_result = None
2276            raise
2277
2278    # def _throw_method_imp(self, *args, **kwargs):
2279    def _throw_method_imp(self, ex_type, ex_value=None, ex_traceback=None):
2280        try:
2281            # if __debug__: dlog(f'λ => (throw): {func_info(self.worker.worker)}')
2282            self.last_result = self.coro.throw(ex_type, ex_value, ex_traceback)
2283            # if __debug__: dlog(f'λ <= (throw): {repr(self.worker.worker)}')
2284        except GreenletExit:
2285            # if __debug__: dlog(f'λ <= (init; GreenletExit): {repr(self.worker.worker)}')
2286            self.last_result = self.interface.result
2287        except:
2288            # if __debug__: dlog(f'λ <= (init; Exception): {repr(self.worker.worker)}')
2289            self.last_result = None
2290            raise
2291
2292    def _close_method_imp(self):
2293        try:
2294            # if __debug__: dlog(f'λ => (close): {func_info(self.worker.worker)}')
2295            self.last_result = self.coro.throw()
2296            # if __debug__: dlog(f'λ <= (close): {repr(self.worker.worker)}')
2297        except GreenletExit:
2298            # if __debug__: dlog(f'λ <= (init; GreenletExit): {repr(self.worker.worker)}')
2299            self.last_result = self.interface.result
2300        except:
2301            # if __debug__: dlog(f'λ <= (init; Exception): {repr(self.worker.worker)}')
2302            self.last_result = None
2303            raise
2304    
2305    def __bool__(self) -> bool:
2306        return bool(self.coro)
2307
2308    def destroy(self):
2309        if isinstance(self.worker, GreenletWorkerWrapper):
2310            self.worker.destroy()
2311        
2312        return super().destroy()
2313
2314
2315async def init_asyncgenerator(entity: Callable):
2316    try:
2317        result = await entity.asend(None)
2318        return (result, None)
2319    except:
2320        exception = get_exception()
2321        return (None, exception)
2322
2323
2324async def call_asyncgenerator(entity: Callable, *args, **kwargs):
2325    try:
2326        result = await entity.asend(*args, **kwargs)
2327        return (result, None)
2328    except:
2329        exception = get_exception()
2330        return (None, exception)
2331
2332
2333async def throw_asyncgenerator(entity: Callable, *args, **kwargs):
2334    try:
2335        result = await entity.athrow(*args, **kwargs)  # await entity.athrow(type[, value[, traceback]]) ; https://docs.python.org/3/reference/expressions.html#agen.asend
2336        return (result, None)
2337    except:
2338        exception = get_exception()
2339        return (None, exception)
2340
2341
2342async def close_asyncgenerator(entity: Callable):
2343    try:
2344        result = await entity.aclose()
2345        return (result, None)
2346    except:
2347        exception = get_exception()
2348        return (None, exception)
2349
2350
2351async def init_asyncgeneratorfunction(entity: Callable, *args, **kwargs):
2352    try:
2353        entity = entity(*args, **kwargs)
2354        result = await entity.asend(None)
2355        return (entity, result, None)
2356    except:
2357        exception = get_exception()
2358        return (entity, None, exception)
2359
2360
2361async def awaitable_wrapper(entity: Awaitable):
2362    return await entity
2363
2364
2365async def callable_wrapper(entity: Callable, *args, **kwargs):
2366    return entity(*args, **kwargs)
2367
2368
2369class CoroWrapperAsyncAwait(CoroWrapperBase):
2370    def __init__(self, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs):
2371        super(CoroWrapperAsyncAwait, self).__init__(loop, coro_id, worker, *args, **kwargs)
2372        self._make_coro_method = self._make_coro_method_imp
2373        self._make_interface = self._make_interface_imp
2374        self.in_run_state = False
2375        self.subtype = self._setup_subtype()
2376        self._current_call_method = self._call_method  # type: Callable
2377
2378    def _make_coro_method_imp(self):
2379        return self.worker
2380    
2381    def _make_interface_imp(self):
2382        return InterfaceAsyncAwait(self.loop, self)
2383
2384    def __bool__(self) -> bool:
2385        return bool(self.in_run_state)
2386    
2387    def _setup_subtype(self):
2388        if inspect.iscoroutine(self.worker):
2389            # if __debug__: dlog('λ >>>\tCOROUTINE')
2390            self._init_method = self._init_coroutine
2391            self._call_method = self._call_coroutine
2392            self._throw_method = self._throw_coroutine
2393            self._close_method = self._close_coroutine
2394            return 0
2395        elif inspect.isgenerator(self.worker):
2396            # if __debug__: dlog('λ >>>\tGENERATOR')
2397            self._init_method = self._init_generator
2398            self._call_method = self._call_coroutine
2399            self._throw_method = self._throw_coroutine
2400            self._close_method = self._close_coroutine
2401            return 1
2402        elif inspect.iscoroutinefunction(self.worker):
2403            # if __debug__: dlog('λ >>>\tCOROUTINE FUNCTION')
2404            self._init_method = self._init_coroutinefunction
2405            self._call_method = self._call_coroutine
2406            self._throw_method = self._throw_coroutine
2407            self._close_method = self._close_coroutine
2408            return 2
2409        elif inspect.isgeneratorfunction(self.worker):
2410            # if __debug__: dlog('λ >>>\tGENERATOR FUNCTION')
2411            self._init_method = self._init_generatorfunction
2412            self._call_method = self._call_coroutine
2413            self._throw_method = self._throw_coroutine
2414            self._close_method = self._close_coroutine
2415            return 3
2416        elif inspect.isasyncgen(self.worker):
2417            # if __debug__: dlog('λ >>>\tASYNC GENERATOR')
2418            self._init_method = self._init_asyncgenerator
2419            self._call_method = self._call_asyncgenerator
2420            self._throw_method = self._throw_asyncgenerator
2421            self._close_method = self._close_asyncgenerator
2422            return 4
2423        elif inspect.isasyncgenfunction(self.worker):
2424            # if __debug__: dlog('λ >>>\tASYNC GENERATOR FUNCTION')
2425            self._init_method = self._init_asyncgeneratorfunction
2426            self._call_method = self._call_asyncgenerator
2427            self._throw_method = self._throw_asyncgenerator
2428            self._close_method = self._close_asyncgenerator
2429            return 5
2430        elif inspect.isawaitable(self.worker):
2431            # if __debug__: dlog('λ >>>\tAWAITABLE')
2432            self._init_method = self._init_awaitable
2433            self._call_method = self._call_coroutine
2434            self._throw_method = self._throw_coroutine
2435            self._close_method = self._close_coroutine
2436            return 6
2437        elif callable(self.worker):
2438            # if __debug__: dlog('λ >>>\tCALLABLE')
2439            self._init_method = self._init_callable
2440            self._call_method = self._call_coroutine
2441            self._throw_method = self._throw_coroutine
2442            self._close_method = self._close_coroutine
2443            return 7
2444        else:
2445            raise TypeError(f'{self.worker} is neither an awaitable nor a wrapper for an awaitable')
2446    
2447    def _init_coroutine(self, init_args, init_kwargs):
2448        try:
2449            self.in_run_state = True
2450            self.last_result = self.coro.send(None)
2451        except StopIteration as ex:
2452            self.in_run_state = False
2453            self.last_result = ex.value
2454        except:
2455            self.in_run_state = False
2456            raise
2457            
2458    def _call_coroutine(self, *args, **kwargs):
2459        try:
2460            self.in_run_state = True
2461            self.last_result = self.coro.send(*args, **kwargs)
2462        except StopIteration as ex:
2463            self.in_run_state = False
2464            self.last_result = ex.value
2465        except:
2466            self.in_run_state = False
2467            raise
2468    
2469    if sys.version_info >= (3, 12):
2470    # if (3, 12) <= PYTHON_VERSION_INT:
2471        def _throw_coroutine(self, ex_type, ex_value=None, ex_traceback=None):
2472            try:
2473                self.in_run_state = True
2474                if ex_value is None:
2475                    ex_value = ex_type()
2476                
2477                self.last_result = self.coro.throw(ex_value)  # Changed in version 3.12: The second signature (type[, value[, traceback]]) is deprecated and may be removed in a future version of Python.
2478            except StopIteration as ex:
2479                self.in_run_state = False
2480                self.last_result = ex.value
2481            except:
2482                self.in_run_state = False
2483                raise
2484    else:
2485        def _throw_coroutine(self, ex_type, ex_value=None, ex_traceback=None):
2486            try:
2487                self.in_run_state = True
2488                self.last_result = self.coro.throw(ex_type, ex_value, ex_traceback)  # Changed in version 3.12: The second signature (type[, value[, traceback]]) is deprecated and may be removed in a future version of Python.
2489            except StopIteration as ex:
2490                self.in_run_state = False
2491                self.last_result = ex.value
2492            except:
2493                self.in_run_state = False
2494                raise
2495            
2496    def _close_coroutine(self, *args, **kwargs):
2497        try:
2498            self.in_run_state = True
2499            self.last_result = self.coro.close()
2500            self.in_run_state = False
2501        except GeneratorExit as ex:
2502            self.in_run_state = False
2503        except:
2504            self.in_run_state = False
2505            raise
2506
2507    def _init_generator(self, init_args, init_kwargs):
2508        try:
2509            self.in_run_state = True
2510            self.last_result = next(self.coro)  # ToDo: investigate how to provide an initial parameters
2511        except StopIteration as ex:
2512            self.in_run_state = False
2513            self.last_result = ex.value
2514        except:
2515            self.in_run_state = False
2516            raise
2517
2518    def _init_coroutinefunction(self, init_args, init_kwargs):
2519        self.coro = self.coro(self.interface, *init_args, **init_kwargs)
2520        self._init_coroutine(None, None)
2521
2522    def _init_generatorfunction(self, init_args, init_kwargs):
2523        self.coro = self.coro(self.interface, *init_args, **init_kwargs)
2524        self._init_generator(None, None)
2525
2526    def _init_asyncgenerator(self, init_args, init_kwargs):
2527        try:
2528            self.in_run_state = True
2529            entity = init_asyncgenerator(self.coro)
2530            result = entity.send(None)
2531        except StopIteration as ex:
2532            self.in_run_state = False
2533            result = ex.value
2534        except:
2535            self.in_run_state = False
2536            raise
2537        
2538        self.last_result, exception = result
2539        if exception is not None:
2540            self.in_run_state = False
2541            if not isinstance(exception, StopAsyncIteration):
2542                raise exception
2543
2544    def _call_asyncgenerator(self, *args, **kwargs):
2545        try:
2546            self.in_run_state = True
2547            entity = call_asyncgenerator(self.coro, *args, **kwargs)
2548            result = entity.send(None)
2549        except StopIteration as ex:
2550            self.in_run_state = False
2551            result = ex.value
2552        except:
2553            self.in_run_state = False
2554            raise
2555        
2556        self.last_result, exception = result
2557        if exception is not None:
2558            self.in_run_state = False
2559            if not isinstance(exception, StopAsyncIteration):
2560                raise exception
2561
2562    def _throw_asyncgenerator(self, *args, **kwargs):
2563        try:
2564            self.in_run_state = True
2565            entity = throw_asyncgenerator(self.coro, *args, **kwargs)
2566            result = entity.send(None)
2567        except StopIteration as ex:
2568            self.in_run_state = False
2569            result = ex.value
2570        except:
2571            self.in_run_state = False
2572            raise
2573        
2574        self.last_result, exception = result
2575        if exception is not None:
2576            self.in_run_state = False
2577            if not isinstance(exception, StopAsyncIteration):
2578                raise exception
2579
2580    def _close_asyncgenerator(self):
2581        try:
2582            self.in_run_state = True
2583            entity = close_asyncgenerator(self.coro)
2584            result = entity.send(None)
2585        except StopIteration as ex:  # TODO: check maybe `GeneratorExit` will be raised
2586            self.in_run_state = False
2587            result = ex.value
2588        except:
2589            self.in_run_state = False
2590            raise
2591        
2592        self.last_result, exception = result
2593        if exception is not None:
2594            self.in_run_state = False
2595            if not isinstance(exception, StopAsyncIteration):
2596                raise exception
2597
2598    def _init_asyncgeneratorfunction(self, init_args, init_kwargs):
2599        try:
2600            self.in_run_state = True
2601            entity = init_asyncgeneratorfunction(self.coro, self.interface, *init_args, **init_kwargs)
2602            result = entity.send(None)
2603        except StopIteration as ex:
2604            self.in_run_state = False
2605            result = ex.value
2606        except:
2607            self.in_run_state = False
2608            raise
2609        
2610        self.coro, self.last_result, exception = result
2611        if exception is not None:
2612            self.in_run_state = False
2613            if not isinstance(exception, StopAsyncIteration):
2614                raise exception
2615
2616    def _init_awaitable(self, init_args, init_kwargs):
2617        self.coro = awaitable_wrapper(self.coro)
2618        self._init_coroutine()
2619
2620    def _init_callable(self, init_args, init_kwargs):
2621        self.coro = callable_wrapper(self.coro, self.interface, *init_args, **init_kwargs)
2622        self._init_coroutine()
2623
2624    def destroy(self):
2625        self._make_coro_method = None
2626        self._make_interface = None
2627        self.in_run_state = None
2628        self.subtype = None
2629        return super().destroy()
2630
2631
2632class GreenletWorkerWrapper:
2633    def __init__(self, worker: Worker):
2634        self.worker = worker
2635        
2636    def __call__(self, interface: 'InterfaceGreenlet', *args, **kwargs):
2637        result = None
2638        try:
2639            result = self.worker(interface, *args, **kwargs)
2640        finally:
2641            interface.register_result(result)  # in case if GreenletExit will be raised by greenlet framework
2642        return result
2643
2644    def destroy(self):
2645        self.worker = None
2646
2647
2648def find_coro_type(entity) -> CoroType:
2649    if isinstance(entity, EntityArgsHolder):
2650        entity, args, kwargs = entity.entity_args_kwargs()
2651
2652    if inspect.iscoroutine(entity) or inspect.isgenerator(entity) or inspect.iscoroutinefunction(entity) or inspect.isgeneratorfunction(entity) or inspect.isasyncgen(entity) or inspect.isasyncgenfunction(entity) or inspect.isawaitable(entity):
2653        return CoroType.awaitable
2654    elif callable(entity):
2655        return CoroType.greenlet
2656    else:
2657        raise TypeError(f'{entity} is neither an awaitable nor a greenlet')
2658
2659
2660def coro_wrapper_factory(coro_type: CoroType, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs) -> 'CoroWrapperBase':
2661    if CoroType.auto == coro_type:
2662        coro_type = find_coro_type(worker)
2663
2664    if CoroType.greenlet == coro_type:
2665        if isinstance(worker, EntityArgsHolder):
2666            worker, args, kwargs = worker.entity_args_kwargs()
2667            worker = GreenletWorkerWrapper(worker)
2668            worker: EntityArgsHolderExplicit = EntityArgsHolderExplicit(worker, args, kwargs)
2669            return CoroWrapperGreenlet(loop, coro_id, worker, *args, **kwargs)
2670        else:
2671            worker = GreenletWorkerWrapper(worker)
2672            return CoroWrapperGreenlet(loop, coro_id, worker, *args, **kwargs)
2673    elif CoroType.awaitable == coro_type:
2674        return CoroWrapperAsyncAwait(loop, coro_id, worker, *args, **kwargs)
2675    else:
2676        raise NotImplementedError
2677
2678
2679class Interface:
2680    def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase):
2681        self._loop: CoroSchedulerBase = loop                        # type: CoroSchedulerBase
2682        self._coro: CoroWrapperBase = coro                        # type: CoroWrapperBase
2683        self.coro_id: CoroID = coro.coro_id               # type: CoroID
2684        self.in_work: bool = False
2685        self.ignored_by_watchdog: bool = False
2686        self.logger: logging.Logger = self._loop.logger
2687        self.log: logging.Logger = self.logger
2688    
2689    @contextmanager
2690    def ignore_by_watchdog(self):
2691        current_state = self.ignored_by_watchdog
2692        self.ignored_by_watchdog = True
2693        try:
2694            yield
2695        finally:
2696            self.ignored_by_watchdog = current_state
2697    
2698    def _normalize_call_args_kwargs(self, service_type: Union[ServiceType, 'ServiceRequest'], args, kwargs) -> Tuple[Type['Service'], Tuple, Dict]:
2699        if inspect.isclass(service_type):
2700            if issubclass(service_type, Service):
2701                return service_type, args, kwargs
2702            elif issubclass(service_type, ServiceRequest):
2703                # return service_type, args, kwargs
2704                return self._normalize_call_args_kwargs(service_type()(*args, **kwargs))
2705            else:
2706                print(f'service_type of an unsupported type: {service_type}')
2707                raise TypeError(f'service_type of an unsupported type: {service_type}')
2708        elif isinstance(service_type, ServiceRequest):
2709            request: ServiceRequest = service_type
2710            service_type = request.default_service_type
2711            if service_type is None:
2712                print(f'Service request class {type(request)} have no default service assigned. Please provide service_type explicitly')
2713                raise RuntimeError(f'Service request class {type(request)} have no default service assigned. Please provide service_type explicitly')
2714            else:
2715                args, kwargs = args_kwargs(request, *args, **kwargs)
2716                return service_type, args, kwargs
2717        else:
2718            print(f'{service_type=}')
2719            raise ValueError(f'{service_type=}')
2720
2721    # def __call__(self, service_type: ServiceType, *args, **kwargs) -> Any:
2722    #     """
2723    #     Should be called from inside coroutines only.
2724    #     Will request some long running work to some service.
2725
2726    #     :param coro_id:
2727    #     :param service_type:
2728    #     :param args:
2729    #     :param kwargs:
2730    #     """
2731
2732    #     response = self.__put_task_method(self.__coro, service_type, *args, *kwargs)
2733    #     return response()
2734
2735    @overload
2736    def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2737
2738    @overload
2739    def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2740
2741    @overload
2742    def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2743
2744    @overload
2745    def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2746
2747    @overload
2748    def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2749
2750    @overload
2751    def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2752
2753    @overload
2754    def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ...
2755
2756    @overload
2757    def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2758
2759    def __call__(self, service_type, *args, **kwargs):
2760        raise NotImplementedError
2761
2762    # !!! Must not be present in interface since leads to incorrect greenlets switches
2763    # def put_coro(self, coro_type: CoroType, coro_worker: Worker, *args, **kwargs) -> CoroID:
2764    #     self.in_work = True
2765    #     coro = self._loop.put_coro(coro_type, coro_worker, *args, **kwargs)
2766    #     self.in_work = False
2767    #     return coro.coro_id
2768
2769    def destroy(self):
2770        self._loop = None
2771        self._coro = None
2772        self.coro_id = None
2773        self.in_work = None
2774
2775
2776class InterfaceGreenlet(Interface):
2777    def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase):
2778        super(InterfaceGreenlet, self).__init__(loop, coro)
2779        self.result = None
2780
2781    @overload
2782    def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2783
2784    @overload
2785    def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2786
2787    @overload
2788    def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2789
2790    @overload
2791    def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2792
2793    @overload
2794    def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2795
2796    @overload
2797    def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2798
2799    @overload
2800    def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ...
2801
2802    @overload
2803    def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2804
2805    def __call__(self, service_type, *args, **kwargs):
2806        """
2807        Should be called from inside coroutines only.
2808        Will request some long running work to some service.
2809
2810        :param coro_id:
2811        :param service_type:
2812        :param args:
2813        :param kwargs:
2814        """
2815
2816        service_type, args, kwargs = self._normalize_call_args_kwargs(service_type, args, kwargs)
2817
2818        self.in_work = True
2819        # if __debug__: dlog(f'λ <= (request): <{func_info(self._coro.worker.worker, False)}>: {service_type}, {args}, {kwargs}')
2820        try:
2821            loop: CoroSchedulerGreenlet = self._loop
2822            response: Response = loop.root_coro.switch(Request(self._coro, service_type, *args, **kwargs))
2823            # TODO: we will switch to the next coro and do necessary preparations and reactions
2824            # if loop._coroutines_can_switch_directly:
2825            #     if loop._is_current_coro_was_new_born:  
2826            #         ...
2827            #     else:
2828            #         ...
2829            # else:
2830            #     response: Response = loop.root_coro.switch(Request(self._coro, service_type, *args, **kwargs))
2831        except AttributeError:
2832            # if __debug__: dlog(f'x λ: {id(self._loop)}, {self._loop}, root_coro: {self._loop.root_coro}; root_coro_iteration: {self._loop.root_coro_iteration}; current_loop: {self._loop.current_loop()}; current_interface: {self._loop.current_interface()}')
2833            raise
2834        # if __debug__: dlog(f'λ => (response): <{func_info(self._coro.worker.worker)}>: {repr(response)}')
2835        self.in_work = False
2836        if isinstance(response, Response):
2837            return response()
2838        
2839        dlog(f"ERROR:\n\tRESPONSE TYPE: {type(response)}\n\tRESPONSE REPR: {repr(response)}\n\tRESPONSE STR: {str(response)}")
2840        raise RuntimeError(f'Wrong type of response from the service: {type(response)}; {repr(response)}.')
2841    
2842    def register_result(self, result: Any):
2843        self.result = result
2844
2845    def destroy(self):
2846        self.result = None
2847        return super().destroy()
2848
2849
2850class InterfaceAsyncAwait(Interface):
2851    def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase):
2852        super(InterfaceAsyncAwait, self).__init__(loop, coro)
2853
2854    @overload
2855    async def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2856
2857    @overload
2858    async def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2859
2860    @overload
2861    async def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2862
2863    @overload
2864    async def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2865
2866    @overload
2867    async def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2868
2869    @overload
2870    async def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2871
2872    @overload
2873    async def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ...
2874
2875    @overload
2876    async def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2877
2878    async def __call__(self, service_type, *args, **kwargs):
2879        """
2880        Should be called from inside coroutines only.
2881        Will request some long running work to some service.
2882
2883        :param coro_id:
2884        :param service_type:
2885        :param args:
2886        :param kwargs:
2887        """
2888
2889        service_type, args, kwargs = self._normalize_call_args_kwargs(service_type, args, kwargs)
2890
2891        self.in_work = True
2892        response = await yield_task_from_asyncawait(Request(self._coro, service_type, *args, **kwargs))
2893        self.in_work = False
2894        if isinstance(response, Response):
2895            return response()
2896        
2897        dlog(f"ERROR:\n\tRESPONSE TYPE: {type(response)}\n\tRESPONSE REPR: {repr(response)}\n\tRESPONSE STR: {str(response)}")
2898        raise RuntimeError('Wrong type of response from the service')
2899
2900
2901class InterfaceFake(Interface):
2902    def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase):
2903        self._loop: CoroSchedulerBase = None                        # type: CoroSchedulerBase
2904        self._coro: CoroWrapperBase = None                        # type: CoroWrapperBase
2905        self.coro_id: CoroID = None                      # type: CoroID
2906        self.in_work: bool = False
2907
2908    @overload
2909    def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2910
2911    @overload
2912    def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2913
2914    @overload
2915    def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2916
2917    @overload
2918    def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2919
2920    @overload
2921    def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2922
2923    @overload
2924    def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2925
2926    @overload
2927    def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ...
2928
2929    @overload
2930    def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2931
2932    def __call__(self, service_type, *args, **kwargs) -> Any:
2933        service_type, args, kwargs = self._normalize_call_args_kwargs(service_type, args, kwargs)
2934        return None
2935
2936
2937class InterfaceFakeAsyncAwait(Interface):
2938    def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase):
2939        self._loop: CoroSchedulerBase = None                        # type: CoroSchedulerBase
2940        self._coro: CoroWrapperBase = None                        # type: CoroWrapperBase
2941        self.coro_id: CoroID = None                      # type: CoroID
2942        self.in_work: bool = False
2943
2944    @overload
2945    async def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2946
2947    @overload
2948    async def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2949
2950    @overload
2951    async def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2952
2953    @overload
2954    async def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2955
2956    @overload
2957    async def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2958
2959    @overload
2960    async def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2961
2962    @overload
2963    async def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ...
2964
2965    @overload
2966    async def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2967
2968    async def __call__(self, service_type, *args, **kwargs):
2969        service_type, args, kwargs = self._normalize_call_args_kwargs(service_type, args, kwargs)
2970        return None
2971
2972
2973class CallerCoroInfo:
2974    def __init__(self, coro: CoroWrapperBase):
2975        self.coro: CoroWrapperBase = coro
2976        self.coro_id: CoroID = coro.coro_id
2977        self.coro_type: Type = type(coro)
2978
2979
2980class WrongServiceRequestError(Exception):
2981    pass
2982
2983
2984class ServiceRequest:
2985    default_service_type: Optional[Type['Service']] = None
2986    default__request__type__: int = 0
2987
2988    def __init__(self):
2989        self.request_type: int = None  # type: Optional[int]
2990        self.args: Optional[Tuple] = None          # type: Optional[Tuple]
2991        self.kwargs: Optional[Dict] = None        # type: Optional[Dict]
2992        self.provide_to_request_handler: bool = False
2993
2994    def _save(self, __request__type__: int, *args, **kwargs) -> 'ServiceRequest':
2995        self.request_type = __request__type__
2996        self.args = args
2997        self.kwargs = kwargs
2998        return self
2999    
3000    def _copy(self) -> 'ServiceRequest':
3001        raise NotImplementedError
3002
3003    def _save_to_copy(self, __request__type__: int, *args, **kwargs) -> 'ServiceRequest':
3004        return self._copy()._save(__request__type__, *args, **kwargs)
3005    
3006    def __call__(self, *args: Any, **kwds: Any) -> 'ServiceRequest':
3007        """should call self._save() with some default __request__type__. Required for the Interface(Type[ServiceRequest], *args, **kwargs) call. Must return self
3008
3009        Returns:
3010            ServiceRequest: Must return self
3011        """
3012        return self._save(self.default__request__type__, *args, **kwds)
3013    
3014    def interface(self) -> Any:
3015        current_interface()(self.default_service_type, self)
3016    
3017    i = interface
3018    
3019    async def async_interface(self) -> Any:
3020        await current_interface()(self.default_service_type, self)
3021    
3022    ai = async_interface
3023    
3024    def __repr__(self):
3025        return f'<{self.__class__.__name__}(request_type: {self.request_type}, args: {self.args}, kwargs: {self.kwargs})>'
3026
3027
3028class TypedServiceRequest(ServiceRequest, Generic[ServiceResponseTypeVar]):
3029    pass
3030
3031
3032class ServiceRequestMethodMixin:
3033    def __init__(self, service: 'Service') -> None:
3034        self.service = service
3035
3036    def __call__(self, *args: Any, **kwds: Any) -> Any:
3037        raise NotImplementedError
3038
3039    def full_processing_iteration(self):
3040        raise NotImplementedError
3041
3042    def in_work(self) -> bool:
3043        raise NotImplementedError
3044
3045    def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool:
3046        raise NotImplementedError
3047
3048
3049class DualImmediateProcessingServiceMixin:
3050    def single_task_registration_or_immediate_processing(self, *args, **kwargs) -> Tuple[bool, Any, None]:
3051        if (len(args) == 1) and (len(kwargs) == 0) and (isinstance(args[0], ServiceRequest)) or ((len(args) == 0) and (len(kwargs) == 1) and ('request' in kwargs) and isinstance(kwargs['request'], ServiceRequest)):
3052            return self.single_task_registration_or_immediate_processing_multiple(*args, **kwargs)
3053        else:
3054            return self.single_task_registration_or_immediate_processing_single(*args, **kwargs)
3055
3056    def single_task_registration_or_immediate_processing_multiple(self, request: ServiceRequest
3057                                                         ) -> Tuple[bool, Any, None]:
3058        return self.resolve_request(request)
3059
3060    def single_task_registration_or_immediate_processing_single(
3061            self, *args, **kwargs
3062    ) -> Tuple[bool, Optional[CoroID], Any]:
3063        raise NotImplementedError
3064
3065
3066ServiceProcessingResultExists = bool
3067ServiceProcessingResult = Any
3068ServiceProcessingException = Optional[BaseException]
3069ServiceProcessingResponse = Tuple[ServiceProcessingResultExists, ServiceProcessingResult, ServiceProcessingException]
3070
3071
3072class Service(Iterable):
3073    def __init__(self, loop: CoroSchedulerBase):
3074        super(Service, self).__init__()
3075        self._loop: CoroSchedulerBase = loop                     # type: CoroSchedulerBase
3076        # self._requests = list()               # type: List[Request]
3077        self._responses: List[Response] = list()              # type: List[Response]
3078        self.current_caller_coro_info: Optional[CallerCoroInfo] = None  # type: Optional[CallerCoroInfo]
3079        self._request_workers = dict()        # type: Dict[int, Callable]
3080
3081    def iteration(self) -> Optional[List[Response]]:
3082        # requests = self._requests
3083        # self._requests = type(self._requests)()
3084        self._responses = list()
3085        # for request in requests:
3086        #     self.current_caller_coro_info = CallerCoroInfo(request.coro_id)
3087        #     result_exists, result = \
3088        #         self.single_task_registration_or_immediate_processing(*request.args, **request.kwargs)
3089        #     if result_exists:
3090        #         self.register_response(request.coro_id, result)
3091        # self.current_caller_coro_info = None
3092        try:
3093            self.full_processing_iteration()
3094        except:
3095            # if __debug__: dlog(sys.exc_info())
3096            raise
3097        return self._responses
3098
3099    def make_response(self, coro_id: CoroID, response: Any, exception: Optional[BaseException]=None):
3100        return Response(coro_id, type(self), response, exception)
3101
3102    def register_response(self, coro_id: CoroID, response: Any, exception: Optional[BaseException]=None):
3103        self._responses.append(self.make_response(coro_id, response, exception))
3104
3105    def put_task(self, request: Request) -> Optional[Response]:
3106        self.current_caller_coro_info = CallerCoroInfo(request.coro)
3107        result_exists, result, exception = \
3108            self.single_task_registration_or_immediate_processing(*request.args, **request.kwargs)
3109        self.current_caller_coro_info = None
3110        if result_exists or exception:
3111            return self.make_response(request.coro_id, result, exception)
3112        return None
3113
3114    def resolve_request(self, request: ServiceRequest):
3115        try:
3116            if request.provide_to_request_handler:
3117                return self._request_workers[request.request_type](request, *request.args, **request.kwargs)
3118            else:
3119                return self._request_workers[request.request_type](*request.args, **request.kwargs)
3120        except:
3121            return True, None, get_exception()
3122
3123    def try_resolve_request(self, *args, **kwargs):
3124        possible_request: Optional[ServiceRequest] = None
3125        if (len(args) == 1) and (len(kwargs) == 0):
3126            possible_request = args[0]
3127        elif (len(kwargs) == 1) and (len(args) == 0):
3128            possible_request = kwargs.pop('request', None)
3129
3130        if possible_request is not None:
3131            if isinstance(possible_request, ServiceRequest):
3132                return self.resolve_request(possible_request)
3133
3134        return None
3135
3136    def single_task_registration_or_immediate_processing(self, *args, **kwargs) -> ServiceProcessingResponse:
3137        raise NotImplementedError
3138
3139    def full_processing_iteration(self):
3140        raise NotImplementedError
3141
3142    def in_work(self) -> bool:
3143        """Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
3144
3145        Raises:
3146            NotImplementedError: _description_
3147
3148        Returns:
3149            bool: _description_
3150        """        
3151        raise NotImplementedError
3152    
3153    def in_forground_work(self) -> bool:
3154        return True
3155
3156    def thrifty_in_work(self, result: bool) -> bool:
3157        if result:
3158            return True
3159        else:
3160            self.make_dead()
3161            return False
3162    
3163    def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]:
3164        return False, None
3165    
3166    def is_low_latency(self) -> bool:
3167        return False
3168    
3169    def make_live(self):
3170        self._loop.make_service_live_fast(type(self), self.is_low_latency())
3171    
3172    def make_dead(self):
3173        self._loop.make_service_dead_fast(type(self))
3174    
3175    @staticmethod
3176    def service_id_impl():
3177        return None
3178    
3179    @staticmethod
3180    def service_id(service_type: Type):
3181        service_id = service_type.service_id_impl()
3182        if service_id is None:
3183            service_id = service_type.__name__
3184        
3185        return service_id
3186    
3187    def destroy(self):
3188        pass
3189
3190
3191class TypedService(Service, Generic[ServiceResponseTypeVar]):
3192    pass
class Counter:
 99class Counter:
100    def __init__(self):
101        self._index = -1  # type: int
102
103    def get(self) -> int:
104        self._index += 1
105        return self._index
def get(self) -> int:
103    def get(self) -> int:
104        self._index += 1
105        return self._index
class Iterable:
108class Iterable:
109    def iteration(self) -> bool:
110        """
111        should return False if ready to stop looping
112        :return:
113        """
114        raise NotImplementedError
def iteration(self) -> bool:
109    def iteration(self) -> bool:
110        """
111        should return False if ready to stop looping
112        :return:
113        """
114        raise NotImplementedError

should return False if ready to stop looping :return:

ServiceType = typing.Type[ForwardRef('Service')]
TypedServiceType = typing.Type[ForwardRef('TypedService[ServiceResponseTypeVar]')]
NormalizableServiceType = typing.Union[typing.Type[ForwardRef('Service')], typing.Type[ForwardRef('TypedService[ServiceResponseTypeVar]')], typing.Type[ForwardRef('ServiceRequest')], typing.Type[ForwardRef('TypedServiceRequest[ServiceResponseTypeVar]')], ForwardRef('Service'), ForwardRef('TypedService[ServiceResponseTypeVar]'), ForwardRef('ServiceRequest'), ForwardRef('TypedServiceRequest[ServiceResponseTypeVar]')]
ItemID = <class 'int'>
CoroID = <class 'int'>
Worker = typing.Union[typing.Callable[[ForwardRef('Interface')], typing.Any], typing.Callable[[ForwardRef('Interface')], typing.Awaitable[typing.Any]]]
GreenetCoro = <class 'greenlet.greenlet'>
ACoro = typing.Union[typing.Awaitable, typing.Coroutine, typing.Generator, typing.AsyncGenerator, typing.Callable]
Coro = typing.Union[greenlet.greenlet, typing.Awaitable, typing.Coroutine, typing.Generator, typing.AsyncGenerator, typing.Callable]
OnCoroDelHandler = typing.Callable[[ForwardRef('CoroWrapperBase')], bool]
def cs_coro( coro_worker: Callable[[~CoroParams], ~CoroResult]) -> collections.abc.Callable[Interface, ~CoroParams, ~CoroResult]:
145def cs_coro(coro_worker: Callable[CoroParams, CoroResult]) -> Callable[Concatenate['Interface', CoroParams], CoroResult]:
146    """Decorator. Without arguments. Makes a greenlet Cengal coroutine from a pain function (which don't have an interface parameter)
147
148    Args:
149        coro_worker (Callable): _description_
150    """
151    if is_async(coro_worker):
152        async def wrapper(i: Interface, *args, **kwargs):
153            return await coro_worker(*args, **kwargs)
154            
155        coro_worker_sign: inspect.Signature = inspect.signature(coro_worker)
156        update_wrapper(wrapper, coro_worker)
157        wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation)
158        # wrapper.__name__ = coro_worker.__name__
159        wrapper: coro_worker
160        return wrapper
161    else:
162        def wrapper(i: Interface, *args, **kwargs):
163            return coro_worker(*args, **kwargs)
164            
165        coro_worker_sign: inspect.Signature = inspect.signature(coro_worker)
166        update_wrapper(wrapper, coro_worker)
167        wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation)
168        # wrapper.__name__ = coro_worker.__name__
169        wrapper: coro_worker
170        return wrapper

Decorator. Without arguments. Makes a greenlet Cengal coroutine from a pain function (which don't have an interface parameter)

Args: coro_worker (Callable): _description_

def cs_acoro( coro_aworker: Callable[[~CoroParams], Awaitable[~CoroResult]]) -> collections.abc.Callable[Interface, ~CoroParams, typing.Awaitable[~CoroResult]]:
173def cs_acoro(coro_aworker: Callable[CoroParams, Awaitable[CoroResult]]) -> Callable[Concatenate['Interface', CoroParams], Awaitable[CoroResult]]:
174    """Decorator. Without arguments. Makes an async Cengal coroutine from an async function (which don't have an interface parameter)
175
176    Args:
177        coro_aworker (Callable): _description_
178    """    
179    if is_async(coro_aworker):
180        async def wrapper(i: Interface, *args, **kwargs):
181            return await coro_aworker(*args, **kwargs)
182            
183        coro_worker_sign: inspect.Signature = inspect.signature(coro_aworker)
184        update_wrapper(wrapper, coro_aworker)
185        wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation)
186        # wrapper.__name__ = coro_aworker.__name__
187        wrapper: coro_aworker
188        return wrapper
189    else:
190        def wrapper(i: Interface, *args, **kwargs):
191            return coro_aworker(*args, **kwargs)
192            
193        coro_worker_sign: inspect.Signature = inspect.signature(coro_aworker)
194        update_wrapper(wrapper, coro_aworker)
195        wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation)
196        wrapper.__name__ = coro_aworker.__name__
197        wrapper: coro_aworker
198        return wrapper

Decorator. Without arguments. Makes an async Cengal coroutine from an async function (which don't have an interface parameter)

Args: coro_aworker (Callable): _description_

def cs_callable( coro_worker: collections.abc.Callable[Interface, ~CoroParams, ~CoroResult]) -> Callable[[~CoroParams], ~CoroResult]:
201def cs_callable(coro_worker: Callable[Concatenate['Interface', CoroParams], CoroResult]) -> Callable[CoroParams, CoroResult]:
202    """Decorator. Without arguments. Makes a callable sync Cengal coroutine (which don't have an interface parameter) from a sync Cengal coroutine
203
204    Args:
205        coro_worker (Callable): _description_
206    """
207    if is_async(coro_worker):
208        async def wrapper(*args, **kwargs):
209            i: Interface = current_interface()
210            return await coro_worker(i, *args, **kwargs)
211            
212        coro_worker_sign: inspect.Signature = inspect.signature(coro_worker)
213        update_wrapper(wrapper, coro_worker)
214        # wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation)
215        wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation)
216        # wrapper.__name__ = coro_worker.__name__
217        wrapper: coro_worker
218        return wrapper
219    else:
220        def wrapper(*args, **kwargs):
221            i: Interface = current_interface()
222            return coro_worker(i, *args, **kwargs)
223            
224        coro_worker_sign: inspect.Signature = inspect.signature(coro_worker)
225        update_wrapper(wrapper, coro_worker)
226        # wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation)
227        wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation)
228        # wrapper.__name__ = coro_worker.__name__
229        wrapper: coro_worker
230        return wrapper

Decorator. Without arguments. Makes a callable sync Cengal coroutine (which don't have an interface parameter) from a sync Cengal coroutine

Args: coro_worker (Callable): _description_

def cs_acallable( coro_aworker: collections.abc.Callable[Interface, ~CoroParams, typing.Awaitable[~CoroResult]]) -> Callable[[~CoroParams], Awaitable[~CoroResult]]:
233def cs_acallable(coro_aworker: Callable[Concatenate['Interface', CoroParams], Awaitable[CoroResult]]) -> Callable[CoroParams, Awaitable[CoroResult]]:
234    """Decorator. Without arguments. Makes a callable async Cengal coroutine (which don't have an interface parameter) from a async Cengal coroutine
235
236    Args:
237        coro_aworker (Callable): _description_
238    """
239    if is_async(coro_aworker):
240        async def wrapper(*args, **kwargs):
241            i: Interface = current_interface()
242            return await coro_aworker(i, *args, **kwargs)
243            
244        coro_worker_sign: inspect.Signature = inspect.signature(coro_aworker)
245        update_wrapper(wrapper, coro_aworker)
246        # wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation)
247        wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation)
248        # wrapper.__name__ = coro_aworker.__name__
249        wrapper: coro_aworker
250        return wrapper
251    else:
252        def wrapper(*args, **kwargs):
253            i: Interface = current_interface()
254            return coro_aworker(i, *args, **kwargs)
255            
256        coro_worker_sign: inspect.Signature = inspect.signature(coro_aworker)
257        update_wrapper(wrapper, coro_aworker)
258        # wrapper.__signature__ = coro_worker_sign.replace(parameters=(inspect.Parameter('_interface_param_', inspect.Parameter.POSITIONAL_ONLY),) + tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation)
259        wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation)
260        # wrapper.__name__ = coro_aworker.__name__
261        wrapper: coro_aworker
262        return wrapper

Decorator. Without arguments. Makes a callable async Cengal coroutine (which don't have an interface parameter) from a async Cengal coroutine

Args: coro_aworker (Callable): _description_

class OutsideCoroSchedulerContext(builtins.Exception):
386class OutsideCoroSchedulerContext(Exception):
387    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
def current_coro_scheduler() -> Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable]:
390def current_coro_scheduler() -> 'CoroSchedulerType':
391    if _current_coro_scheduler.scheduler is None:
392        raise OutsideCoroSchedulerContext
393    
394    return _current_coro_scheduler.scheduler
def get_current_coro_scheduler() -> Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType]:
397def get_current_coro_scheduler() -> Optional['CoroSchedulerType']:
398    return _current_coro_scheduler.scheduler
def set_primary_coro_scheduler( coro_scheduler: Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable]):
401def set_primary_coro_scheduler(coro_scheduler: 'CoroSchedulerType'):
402    _primary_coro_scheduler.scheduler = coro_scheduler
class PrimaryCoroSchedulerWasNotSet(builtins.Exception):
405class PrimaryCoroSchedulerWasNotSet(Exception):
406    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
def primary_coro_scheduler() -> Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable]:
409def primary_coro_scheduler() -> 'CoroSchedulerType':
410    if _primary_coro_scheduler.scheduler is None:
411        raise PrimaryCoroSchedulerWasNotSet
412    
413    return _primary_coro_scheduler.scheduler
def get_primary_coro_scheduler() -> Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType]:
416def get_primary_coro_scheduler() -> Optional['CoroSchedulerType']:
417    return _primary_coro_scheduler.scheduler
class CoroSchedulerContextIsNotAvailable(builtins.Exception):
420class CoroSchedulerContextIsNotAvailable(Exception):
421    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
def available_coro_scheduler() -> Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable]:
424def available_coro_scheduler() -> 'CoroSchedulerType':
425    if _current_coro_scheduler.scheduler:
426        return _current_coro_scheduler.scheduler
427    elif _primary_coro_scheduler.scheduler:
428        return _primary_coro_scheduler.scheduler
429    else:
430        raise CoroSchedulerContextIsNotAvailable
def get_available_coro_scheduler() -> Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType]:
433def get_available_coro_scheduler() -> Optional['CoroSchedulerType']:
434    if _current_coro_scheduler.scheduler:
435        return _current_coro_scheduler.scheduler
436    elif _primary_coro_scheduler.scheduler:
437        return _primary_coro_scheduler.scheduler
438    else:
439        return None
class WrongTypeOfShedulerError(builtins.Exception):
442class WrongTypeOfShedulerError(Exception):
443    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class InterfaceIsNotAvailableError(builtins.Exception):
446class InterfaceIsNotAvailableError(Exception):
447    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class CurrentCoroIsNotAliveError(builtins.Exception):
450class CurrentCoroIsNotAliveError(Exception):
451    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
def loop_with_backup_loop( backup_scheduler: Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType] = None) -> Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable]:
457def loop_with_backup_loop(backup_scheduler: Optional['CoroSchedulerType'] = None) -> 'CoroSchedulerType':
458    if backup_scheduler is not None:
459        if not isinstance(backup_scheduler, CoroScheduler):
460            raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}')
461
462    loop = CoroScheduler.current_loop()
463    if loop is None:
464        # Outside the loop
465        loop = get_available_coro_scheduler()
466        if loop is None:
467            loop = backup_scheduler
468
469    if loop is None:
470        raise CoroSchedulerContextIsNotAvailable
471    
472    return loop
def get_loop_with_backup_loop( backup_scheduler: Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType] = None) -> Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType]:
475def get_loop_with_backup_loop(backup_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['CoroSchedulerType']:
476    if backup_scheduler is not None:
477        if not isinstance(backup_scheduler, CoroScheduler):
478            raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}')
479
480    loop = CoroScheduler.current_loop()
481    if loop is None:
482        # Outside the loop
483        loop = get_available_coro_scheduler()
484        if loop is None:
485            loop = backup_scheduler
486    
487    return loop
def loop_with_explicit_loop( explicit_scheduler: Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType] = None) -> Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable]:
490def loop_with_explicit_loop(explicit_scheduler: Optional['CoroSchedulerType'] = None) -> 'CoroSchedulerType':
491    loop = explicit_scheduler
492    current_loop = CoroScheduler.current_loop()
493    if loop is None:
494        loop = current_loop
495        if loop is None:
496            # Outside the loop
497            loop = get_available_coro_scheduler()
498    else:
499        if not isinstance(loop, CoroScheduler):
500            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
501
502    if loop is None:
503        raise CoroSchedulerContextIsNotAvailable
504    
505    return loop
def get_loop_with_explicit_loop( explicit_scheduler: Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType] = None) -> Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType]:
508def get_loop_with_explicit_loop(explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['CoroSchedulerType']:
509    loop = explicit_scheduler
510    current_loop = CoroScheduler.current_loop()
511    if loop is None:
512        loop = current_loop
513        if loop is None:
514            # Outside the loop
515            loop = get_available_coro_scheduler()
516    else:
517        if not isinstance(loop, CoroScheduler):
518            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
519    
520    return loop
def interface_and_loop_with_backup_loop( backup_scheduler: Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType] = None) -> tuple[typing.Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], Interface, bool]:
524def interface_and_loop_with_backup_loop(backup_scheduler: Optional['CoroSchedulerType'] = None) -> Tuple['CoroSchedulerType', 'Interface', bool]:
525    if backup_scheduler is not None:
526        if not isinstance(backup_scheduler, CoroScheduler):
527            raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}')
528
529    loop = CoroScheduler.current_loop()
530    if loop is None:
531        # Outside the loop
532        interface = None
533        loop = get_available_coro_scheduler()
534        if loop is None:
535            loop = backup_scheduler
536    else:
537        # In the loop (in coroutine or in the service)
538        interface = loop.current_interface()
539
540    if loop is None:
541        raise CoroSchedulerContextIsNotAvailable
542    
543    if interface is None:
544        raise InterfaceIsNotAvailableError
545    
546    coro_alive: bool = False
547    if interface is not None:
548        if interface._coro:
549            coro_alive = True
550    
551    if not coro_alive:
552        raise CurrentCoroIsNotAliveError
553    
554    return loop, interface, coro_alive
def get_interface_and_loop_with_backup_loop( backup_scheduler: Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType] = None) -> tuple[typing.Union[typing.Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType], typing.Union[Interface, NoneType], bool]:
557def get_interface_and_loop_with_backup_loop(backup_scheduler: Optional['CoroSchedulerType'] = None) -> Tuple[Optional['CoroSchedulerType'], Optional['Interface'], bool]:
558    if backup_scheduler is not None:
559        if not isinstance(backup_scheduler, CoroScheduler):
560            raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}')
561
562    loop = CoroScheduler.current_loop()
563    if loop is None:
564        # Outside the loop
565        interface = None
566        loop = get_available_coro_scheduler()
567        if loop is None:
568            loop = backup_scheduler
569    else:
570        # In the loop (in coroutine or in the service)
571        interface = loop.current_interface()
572    
573    coro_alive: bool = False
574    if interface is not None:
575        if interface._coro:
576            coro_alive = True
577    
578    return loop, interface, coro_alive
def interface_and_loop_with_explicit_loop( explicit_scheduler: Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType] = None) -> tuple[typing.Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], Interface, bool]:
581def interface_and_loop_with_explicit_loop(explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Tuple['CoroSchedulerType', 'Interface', bool]:
582    loop = explicit_scheduler
583    current_loop = CoroScheduler.current_loop()
584    interface = None
585    if loop is None:
586        loop = current_loop
587        if loop is None:
588            # Outside the loop
589            loop = get_available_coro_scheduler()
590        else:
591            # In the loop (in coroutine or in the service)
592            interface = loop.current_interface()
593    else:
594        if isinstance(loop, CoroScheduler):
595            if loop is current_loop:
596                interface = loop.current_interface()
597        else:
598            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
599
600    if loop is None:
601        raise CoroSchedulerContextIsNotAvailable
602    
603    if interface is None:
604        raise InterfaceIsNotAvailableError
605    
606    coro_alive: bool = False
607    if interface is not None:
608        if interface._coro:
609            coro_alive = True
610    
611    if not coro_alive:
612        raise CurrentCoroIsNotAliveError
613    
614    return loop, interface, coro_alive
def get_interface_and_loop_with_explicit_loop( explicit_scheduler: Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType] = None) -> tuple[typing.Union[typing.Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType], typing.Union[Interface, NoneType], bool]:
617def get_interface_and_loop_with_explicit_loop(explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Tuple[Optional['CoroSchedulerType'], Optional['Interface'], bool]:
618    loop = explicit_scheduler
619    current_loop = CoroScheduler.current_loop()
620    interface = None
621    if loop is None:
622        loop = current_loop
623        if loop is None:
624            # Outside the loop
625            loop = get_available_coro_scheduler()
626        else:
627            # In the loop (in coroutine or in the service)
628            interface = loop.current_interface()
629    else:
630        if isinstance(loop, CoroScheduler):
631            if loop is current_loop:
632                interface = loop.current_interface()
633        else:
634            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
635    
636    coro_alive: bool = False
637    if interface is not None:
638        if interface._coro:
639            coro_alive = True
640    
641    return loop, interface, coro_alive
def interface_for_an_explicit_loop( explicit_scheduler: Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable]) -> tuple[typing.Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], Interface, bool]:
644def interface_for_an_explicit_loop(explicit_scheduler: 'CoroSchedulerType') -> Tuple['CoroSchedulerType', 'Interface', bool]:
645    loop = explicit_scheduler
646    interface = None
647    if loop is None:
648        raise CoroSchedulerContextIsNotAvailable
649    else:
650        if isinstance(loop, CoroScheduler):
651            if loop is CoroScheduler.current_loop():
652                interface = loop.current_interface()
653        else:
654            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
655    
656    if interface is None:
657        raise InterfaceIsNotAvailableError
658    
659    coro_alive: bool = False
660    if interface is not None:
661        if interface._coro:
662            coro_alive = True
663    
664    if not coro_alive:
665        raise CurrentCoroIsNotAliveError
666    
667    return loop, interface, coro_alive
def get_interface_for_an_explicit_loop( explicit_scheduler: Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable]) -> tuple[typing.Union[typing.Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType], typing.Union[Interface, NoneType], bool]:
670def get_interface_for_an_explicit_loop(explicit_scheduler: 'CoroSchedulerType') -> Tuple[Optional['CoroSchedulerType'], Optional['Interface'], bool]:
671    loop = explicit_scheduler
672    interface = None
673    if loop is not None:
674        if isinstance(loop, CoroScheduler):
675            if loop is CoroScheduler.current_loop():
676                interface = loop.current_interface()
677        else:
678            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
679    
680    coro_alive: bool = False
681    if interface is not None:
682        if interface._coro:
683            coro_alive = True
684    
685    return loop, interface, coro_alive
def service_with_backup_loop( service_type: type[Service], backup_scheduler: Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType] = None) -> Service:
690def service_with_backup_loop(service_type: Type['Service'], backup_scheduler: Optional['CoroSchedulerType'] = None) -> 'Service':
691    if backup_scheduler is not None:
692        if not isinstance(backup_scheduler, CoroScheduler):
693            raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}')
694
695    loop = CoroScheduler.current_loop()
696    if loop is None:
697        # Outside the loop
698        loop = get_available_coro_scheduler()
699        if loop is None:
700            loop = backup_scheduler
701
702    if loop is None:
703        raise CoroSchedulerContextIsNotAvailable
704
705    return loop.get_service_instance(service_type)
def get_service_with_backup_loop( service_type: type[Service], backup_scheduler: Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType] = None) -> Union[Service, NoneType]:
708def get_service_with_backup_loop(service_type: Type['Service'], backup_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['Service']:
709    if backup_scheduler is not None:
710        if not isinstance(backup_scheduler, CoroScheduler):
711            raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}')
712
713    loop = CoroScheduler.current_loop()
714    if loop is None:
715        # Outside the loop
716        loop = get_available_coro_scheduler()
717        if loop is None:
718            loop = backup_scheduler
719
720    if loop is None:
721        return None
722    else:
723        return loop.get_service_instance(service_type)
def service_with_explicit_loop( service_type: type[Service], explicit_scheduler: Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType] = None) -> Service:
726def service_with_explicit_loop(service_type: Type['Service'], explicit_scheduler: Optional['CoroSchedulerType'] = None) -> 'Service':
727    loop = explicit_scheduler
728    current_loop = CoroScheduler.current_loop()
729    if loop is None:
730        loop = current_loop
731        if loop is None:
732            # Outside the loop
733            loop = get_available_coro_scheduler()
734    else:
735        if not isinstance(loop, CoroScheduler):
736            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
737
738    if loop is None:
739        raise CoroSchedulerContextIsNotAvailable
740
741    return loop.get_service_instance(service_type)
def get_service_with_explicit_loop( service_type: type[Service], explicit_scheduler: Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType] = None) -> Union[Service, NoneType]:
744def get_service_with_explicit_loop(service_type: Type['Service'], explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['Service']:
745    loop = explicit_scheduler
746    current_loop = CoroScheduler.current_loop()
747    if loop is None:
748        loop = current_loop
749        if loop is None:
750            # Outside the loop
751            loop = get_available_coro_scheduler()
752    else:
753        if not isinstance(loop, CoroScheduler):
754            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
755
756    if loop is None:
757        return None
758    else:
759        return loop.get_service_instance(service_type)
def service_fast_with_backup_loop( service_type: type[Service], backup_scheduler: Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType] = None) -> Service:
763def service_fast_with_backup_loop(service_type: Type['Service'], backup_scheduler: Optional['CoroSchedulerType'] = None) -> 'Service':
764    if backup_scheduler is not None:
765        if not isinstance(backup_scheduler, CoroScheduler):
766            raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}')
767
768    loop = CoroScheduler.current_loop()
769    if loop is None:
770        # Outside the loop
771        loop = get_available_coro_scheduler()
772        if loop is None:
773            loop = backup_scheduler
774
775    if loop is None:
776        raise CoroSchedulerContextIsNotAvailable
777
778    return loop.get_service_instance_fast(service_type)
def get_service_fast_with_backup_loop( service_type: type[Service], backup_scheduler: Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType] = None) -> Union[Service, NoneType]:
781def get_service_fast_with_backup_loop(service_type: Type['Service'], backup_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['Service']:
782    if backup_scheduler is not None:
783        if not isinstance(backup_scheduler, CoroScheduler):
784            raise WrongTypeOfShedulerError(f'Wrong type of the backup_scheduler ({repr(backup_scheduler)}): {type(backup_scheduler)}')
785
786    loop = CoroScheduler.current_loop()
787    if loop is None:
788        # Outside the loop
789        loop = get_available_coro_scheduler()
790        if loop is None:
791            loop = backup_scheduler
792
793    if loop is None:
794        return None
795    else:
796        return loop.get_service_instance_fast(service_type)
def service_fast_with_explicit_loop( service_type: type[Service], explicit_scheduler: Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType] = None) -> Service:
799def service_fast_with_explicit_loop(service_type: Type['Service'], explicit_scheduler: Optional['CoroSchedulerType'] = None) -> 'Service':
800    loop = explicit_scheduler
801    current_loop = CoroScheduler.current_loop()
802    if loop is None:
803        loop = current_loop
804        if loop is None:
805            # Outside the loop
806            loop = get_available_coro_scheduler()
807    else:
808        if not isinstance(loop, CoroScheduler):
809            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
810
811    if loop is None:
812        raise CoroSchedulerContextIsNotAvailable
813
814    return loop.get_service_instance_fast(service_type)
def get_service_fast_with_explicit_loop( service_type: type[Service], explicit_scheduler: Union[Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable], NoneType] = None) -> Union[Service, NoneType]:
817def get_service_fast_with_explicit_loop(service_type: Type['Service'], explicit_scheduler: Optional['CoroSchedulerType'] = None) -> Optional['Service']:
818    loop = explicit_scheduler
819    current_loop = CoroScheduler.current_loop()
820    if loop is None:
821        loop = current_loop
822        if loop is None:
823            # Outside the loop
824            loop = get_available_coro_scheduler()
825    else:
826        if not isinstance(loop, CoroScheduler):
827            raise WrongTypeOfShedulerError(f'Wrong type of the explicit_scheduler ({repr(loop)}): {type(loop)}')
828
829    if loop is None:
830        return None
831    else:
832        return loop.get_service_instance_fast(service_type)
class CoroType(enum.Enum):
836class CoroType(Enum):
837    auto = 0
838    awaitable = 1
839    greenlet = 2

An enumeration.

auto = <CoroType.auto: 0>
awaitable = <CoroType.awaitable: 1>
greenlet = <CoroType.greenlet: 2>
Inherited Members
enum.Enum
name
value
class ExplicitWorker:
842class ExplicitWorker:
843    def __init__(self, coro_type: CoroType, worker: Worker) -> None:
844        self.coro_type: CoroType = coro_type
845        self.worker: Worker = worker
ExplicitWorker( coro_type: CoroType, worker: Union[collections.abc.Callable[Interface, Any], collections.abc.Callable[Interface, Awaitable[Any]]])
843    def __init__(self, coro_type: CoroType, worker: Worker) -> None:
844        self.coro_type: CoroType = coro_type
845        self.worker: Worker = worker
coro_type: CoroType
worker: Union[collections.abc.Callable[Interface, Any], collections.abc.Callable[Interface, Awaitable[Any]]]
AnyWorker = typing.Union[ExplicitWorker, typing.Callable[[ForwardRef('Interface')], typing.Any], typing.Callable[[ForwardRef('Interface')], typing.Awaitable[typing.Any]]]
CoroScheduler = <class 'CoroSchedulerGreenlet'>
CoroSchedulerType: typing_extensions.TypeAlias = typing.Union[CoroSchedulerGreenlet, CoroSchedulerAwaitable]
class CoroSchedulerGreenlet(CoroSchedulerBase):
1898class CoroSchedulerGreenlet(CoroSchedulerBase):
1899    def __init__(self, logger: Optional[logging.Logger] = None):
1900        super().__init__(logger)
1901        self.root_coro_loop = greenlet(self._loop_imp)            # type: Coro
1902        # self.root_coro_iteration = greenlet(self._iteration_imp)  # type: Coro
1903        self.root_coro_iteration = greenlet(self._iter_wrapper)  # type: Coro
1904        # self._root_coro = None                                     # type: Optional[Coro]
1905        self.root_coro = None                                     # type: Optional[Coro]
1906        self._coroutines_can_switch_directly = True
1907    
1908    def _new_coro_type_normalizer(self, coro_type: Optional[CoroType]) -> CoroType:
1909        return coro_type or CoroType.auto
1910
1911    def loop(self):
1912        if self.need_to_log_loop_start_and_end:
1913            self.logger.info(f'{datetime.now()} >> {type(self).__name__} loop started...')
1914        
1915        self.root_coro = self.root_coro_loop
1916
1917        # if __debug__: dlog('Switch to root_coro')
1918        self.root_coro.switch()
1919        # if __debug__: dlog('Switch from root_coro')
1920        self.root_coro = None
1921        self.root_coro_loop = greenlet(self._loop_imp)
1922    
1923    def iteration(self):
1924        # global _debug_log_counter
1925        # last_debug_log_counter = _debug_log_counter
1926        # if __debug__: dlog(f'^ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}')
1927        
1928        self.root_coro = self.root_coro_iteration
1929        in_work = False
1930        try:
1931            # if __debug__: dlog('Switch to root_coro (iteration)')
1932            in_work = self.root_coro.switch(True)
1933            # if __debug__: dlog('Switch from root_coro (iteration)')
1934        except GreenletExit:
1935            self.stop_iteration()
1936            # if __debug__: dlog('Switch from root_coro (GreenletExit)')
1937        except:
1938            self.stop_iteration()
1939            # if __debug__: dlog('Switch from root_coro (Exception):')
1940            # if __debug__: dlog(sys.exc_info())
1941            raise
1942        finally:
1943            self.root_coro = None
1944            # if __debug__:
1945            #     if last_debug_log_counter < _debug_log_counter:
1946            #         if __debug__: dlog(f'_ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}')
1947        
1948        return in_work
1949    
1950    def stop_iteration(self):
1951        try:
1952            in_work = self.root_coro.switch(False)
1953        except GreenletExit:
1954            self.stop_iteration()
1955            # if __debug__: dlog('Stop root_coro (GreenletExit)')
1956        except:
1957            self.stop_iteration()
1958            # if __debug__: dlog('Stop root_coro (Exception):')
1959            # if __debug__: dlog(sys.exc_info())
1960            raise
1961        finally:
1962            self.root_coro = None
1963            self.root_coro_iteration = greenlet(self._iter_wrapper)
1964    
1965    def _iter_wrapper(self, proceed: bool=True):
1966        try:
1967            while proceed:
1968                self._iteration_imp()
1969                proceed = greenlet.getcurrent().parent.switch(self.in_work())
1970        except CoroSchedulerDestroyRequestedException:
1971            pass
1972        finally:
1973            self.destroy()
CoroSchedulerGreenlet(logger: Union[logging.Logger, NoneType] = None)
1899    def __init__(self, logger: Optional[logging.Logger] = None):
1900        super().__init__(logger)
1901        self.root_coro_loop = greenlet(self._loop_imp)            # type: Coro
1902        # self.root_coro_iteration = greenlet(self._iteration_imp)  # type: Coro
1903        self.root_coro_iteration = greenlet(self._iter_wrapper)  # type: Coro
1904        # self._root_coro = None                                     # type: Optional[Coro]
1905        self.root_coro = None                                     # type: Optional[Coro]
1906        self._coroutines_can_switch_directly = True
root_coro_loop
root_coro_iteration
root_coro
def loop(self):
1911    def loop(self):
1912        if self.need_to_log_loop_start_and_end:
1913            self.logger.info(f'{datetime.now()} >> {type(self).__name__} loop started...')
1914        
1915        self.root_coro = self.root_coro_loop
1916
1917        # if __debug__: dlog('Switch to root_coro')
1918        self.root_coro.switch()
1919        # if __debug__: dlog('Switch from root_coro')
1920        self.root_coro = None
1921        self.root_coro_loop = greenlet(self._loop_imp)
def iteration(self):
1923    def iteration(self):
1924        # global _debug_log_counter
1925        # last_debug_log_counter = _debug_log_counter
1926        # if __debug__: dlog(f'^ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}')
1927        
1928        self.root_coro = self.root_coro_iteration
1929        in_work = False
1930        try:
1931            # if __debug__: dlog('Switch to root_coro (iteration)')
1932            in_work = self.root_coro.switch(True)
1933            # if __debug__: dlog('Switch from root_coro (iteration)')
1934        except GreenletExit:
1935            self.stop_iteration()
1936            # if __debug__: dlog('Switch from root_coro (GreenletExit)')
1937        except:
1938            self.stop_iteration()
1939            # if __debug__: dlog('Switch from root_coro (Exception):')
1940            # if __debug__: dlog(sys.exc_info())
1941            raise
1942        finally:
1943            self.root_coro = None
1944            # if __debug__:
1945            #     if last_debug_log_counter < _debug_log_counter:
1946            #         if __debug__: dlog(f'_ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}')
1947        
1948        return in_work

should return False if ready to stop looping :return:

def stop_iteration(self):
1950    def stop_iteration(self):
1951        try:
1952            in_work = self.root_coro.switch(False)
1953        except GreenletExit:
1954            self.stop_iteration()
1955            # if __debug__: dlog('Stop root_coro (GreenletExit)')
1956        except:
1957            self.stop_iteration()
1958            # if __debug__: dlog('Stop root_coro (Exception):')
1959            # if __debug__: dlog(sys.exc_info())
1960            raise
1961        finally:
1962            self.root_coro = None
1963            self.root_coro_iteration = greenlet(self._iter_wrapper)
Inherited Members
CoroSchedulerBase
in_iteration
services
live_services
live_low_latency_services
requests
responses
new_born_coroutines
coroutines
coro_counter
services_in_work
services_in_foreground_work
services_in_active_state
services_in_active_state_list
time_left_before_next_event
coro_on_del_handlers
current_coro_interface
current_coro_wrapper
suppress_coro_exceptions
iteration_index
context_switches
current_coro_start_time
coro_execution_time
coro_longest_execution_time
loop_iteration_start_time
need_to_measure_loop_iteration_time
need_to_measure_coro_time
need_to_gather_coro_history
permitted_use_put_coro_from_coro
get_coro_start_time
get_loop_iteration_start_time
coro_history_gatherer
on_woke_up_callback
coro_on_start_handlers
global_on_start_handlers_turned_on
execute_global_on_start_handlers
coro_workers_history
coro_full_history
run_low_latency_services
services_impostrors
keep_coro_execution_time_between_iterations
keep_coro_longest_execution_time_between_iterations
keep_coro_workers_history_between_iterations
keep_coro_full_history_between_iterations
loop_iteration_delta_time
suppress_warnings_about_responses_to_not_existant_coroutines
use_internal_sleep
high_cpu_utilisation_mode
foreground_coro_num
on_idle_handlers
idle_managers_num
on_wrong_request
get_service_by_type
get_service_instance_fast
get_service_instance
make_service_live_fast
make_service_live
make_service_dead_fast
make_service_dead
sliding_window
need_to_log_loop_start_and_end
on_destroyed_handlers
get_entity_stats
log_exc
get_service_instance_fast_impl
get_service_instance_impl
make_service_live_fast_impl
make_service_live_impl
make_service_dead_fast_impl
make_service_dead_impl
get_service_instance_fast_impostor_impl
get_service_instance_impostor_impl
make_service_live_fast_impostor_impl
make_service_live_impostor_impl
make_service_dead_fast_impostor_impl
make_service_dead_impostor_impl
set_coro_time_measurement
set_coro_history_gathering
set_loop_iteration_time_measurement
put_coro_fast
put_coro
register_service
is_service_registered
unregister_service
destroy_services
get_service_by_type_impl
get_service_by_type_impostor_impl
register_service_impostor
find_new_born_coroutine
get_coro
purify_worker
set_global_on_start_handlers
add_on_global_on_start_handler
process_coro_exit_status
kill_coro
throw_coro
forget_coro_by_id
find_coro_by_id
del_coro_by_id
kill_coro_by_id
throw_coro_by_id
request_coro_throw_by_id
request_coro_close_by_id
turn_on_embedded_mode
add_global_on_coro_del_handler
restore_global_state
current_loop
current_interface
current_wrapper
destroy_all_coroutines
destroy_new_born_coroutines
destroy_coroutines
destroy
in_work
is_awake
is_idle
next_event_after
class CoroSchedulerAwaitable(CoroSchedulerBase):
1976class CoroSchedulerAwaitable(CoroSchedulerBase):
1977    def __init__(self, logger: Optional[logging.Logger] = None):
1978        super().__init__(logger)
1979        self.root_coro = None                                     # type: Optional[Coro]
1980    
1981    def _new_coro_type_normalizer(self, coro_type: Optional[CoroType]) -> CoroType:
1982        return CoroType.awaitable
1983
1984    def loop(self):
1985        if self.need_to_log_loop_start_and_end:
1986            self.logger.info(f'{datetime.now()} >> {type(self).__name__} loop started...')
1987        
1988        self._loop_imp()
1989    
1990    def iteration(self):
1991        # global _debug_log_counter
1992        # last_debug_log_counter = _debug_log_counter
1993        # if __debug__: dlog(f'^ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}')
1994        
1995        need_to_destroy = False
1996        try:
1997            self._iteration_imp()
1998            return self.in_work()
1999        except CoroSchedulerDestroyRequestedException:
2000            need_to_destroy = True
2001        except:
2002            need_to_destroy = True
2003            raise
2004        finally:
2005            # if __debug__:
2006            #     if last_debug_log_counter < _debug_log_counter:
2007            #         if __debug__: dlog(f'_ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}')
2008            
2009            if need_to_destroy:
2010                self.destroy()
CoroSchedulerAwaitable(logger: Union[logging.Logger, NoneType] = None)
1977    def __init__(self, logger: Optional[logging.Logger] = None):
1978        super().__init__(logger)
1979        self.root_coro = None                                     # type: Optional[Coro]
root_coro
def loop(self):
1984    def loop(self):
1985        if self.need_to_log_loop_start_and_end:
1986            self.logger.info(f'{datetime.now()} >> {type(self).__name__} loop started...')
1987        
1988        self._loop_imp()
def iteration(self):
1990    def iteration(self):
1991        # global _debug_log_counter
1992        # last_debug_log_counter = _debug_log_counter
1993        # if __debug__: dlog(f'^ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}')
1994        
1995        need_to_destroy = False
1996        try:
1997            self._iteration_imp()
1998            return self.in_work()
1999        except CoroSchedulerDestroyRequestedException:
2000            need_to_destroy = True
2001        except:
2002            need_to_destroy = True
2003            raise
2004        finally:
2005            # if __debug__:
2006            #     if last_debug_log_counter < _debug_log_counter:
2007            #         if __debug__: dlog(f'_ λ ({id(self)}, {self}). root_coro: {self.root_coro}; root_coro_iteration: {self.root_coro_iteration}; current_loop: {self.current_loop()}; current_interface: {self.current_interface()}')
2008            
2009            if need_to_destroy:
2010                self.destroy()

should return False if ready to stop looping :return:

Inherited Members
CoroSchedulerBase
in_iteration
services
live_services
live_low_latency_services
requests
responses
new_born_coroutines
coroutines
coro_counter
services_in_work
services_in_foreground_work
services_in_active_state
services_in_active_state_list
time_left_before_next_event
coro_on_del_handlers
current_coro_interface
current_coro_wrapper
suppress_coro_exceptions
iteration_index
context_switches
current_coro_start_time
coro_execution_time
coro_longest_execution_time
loop_iteration_start_time
need_to_measure_loop_iteration_time
need_to_measure_coro_time
need_to_gather_coro_history
permitted_use_put_coro_from_coro
get_coro_start_time
get_loop_iteration_start_time
coro_history_gatherer
on_woke_up_callback
coro_on_start_handlers
global_on_start_handlers_turned_on
execute_global_on_start_handlers
coro_workers_history
coro_full_history
run_low_latency_services
services_impostrors
keep_coro_execution_time_between_iterations
keep_coro_longest_execution_time_between_iterations
keep_coro_workers_history_between_iterations
keep_coro_full_history_between_iterations
loop_iteration_delta_time
suppress_warnings_about_responses_to_not_existant_coroutines
use_internal_sleep
high_cpu_utilisation_mode
foreground_coro_num
on_idle_handlers
idle_managers_num
on_wrong_request
get_service_by_type
get_service_instance_fast
get_service_instance
make_service_live_fast
make_service_live
make_service_dead_fast
make_service_dead
sliding_window
need_to_log_loop_start_and_end
on_destroyed_handlers
get_entity_stats
log_exc
get_service_instance_fast_impl
get_service_instance_impl
make_service_live_fast_impl
make_service_live_impl
make_service_dead_fast_impl
make_service_dead_impl
get_service_instance_fast_impostor_impl
get_service_instance_impostor_impl
make_service_live_fast_impostor_impl
make_service_live_impostor_impl
make_service_dead_fast_impostor_impl
make_service_dead_impostor_impl
set_coro_time_measurement
set_coro_history_gathering
set_loop_iteration_time_measurement
put_coro_fast
put_coro
register_service
is_service_registered
unregister_service
destroy_services
get_service_by_type_impl
get_service_by_type_impostor_impl
register_service_impostor
find_new_born_coroutine
get_coro
purify_worker
set_global_on_start_handlers
add_on_global_on_start_handler
process_coro_exit_status
kill_coro
throw_coro
forget_coro_by_id
find_coro_by_id
del_coro_by_id
kill_coro_by_id
throw_coro_by_id
request_coro_throw_by_id
request_coro_close_by_id
turn_on_embedded_mode
add_global_on_coro_del_handler
restore_global_state
current_loop
current_interface
current_wrapper
destroy_all_coroutines
destroy_new_born_coroutines
destroy_coroutines
destroy
in_work
is_awake
is_idle
next_event_after
def current_interface() -> Union[Interface, NoneType]:
2019def current_interface() -> Optional['Interface']:
2020    return current_coro_scheduler().current_interface()
def execute_coro( coro_worker: Union[ExplicitWorker, collections.abc.Callable[Interface, Any], collections.abc.Callable[Interface, Awaitable[Any]]], *args, **kwargs) -> Any:
2023def execute_coro(coro_worker: AnyWorker, *args, **kwargs) -> Any:
2024    return coro_worker(current_interface(), *args, **kwargs)
def exec_coro( coro_worker: Union[ExplicitWorker, collections.abc.Callable[Interface, Any], collections.abc.Callable[Interface, Awaitable[Any]]], *args, **kwargs) -> Any:
2023def execute_coro(coro_worker: AnyWorker, *args, **kwargs) -> Any:
2024    return coro_worker(current_interface(), *args, **kwargs)
def ecoro( coro_worker: Union[ExplicitWorker, collections.abc.Callable[Interface, Any], collections.abc.Callable[Interface, Awaitable[Any]]], *args, **kwargs) -> Any:
2023def execute_coro(coro_worker: AnyWorker, *args, **kwargs) -> Any:
2024    return coro_worker(current_interface(), *args, **kwargs)
async def aexecute_coro( coro_worker: Union[ExplicitWorker, collections.abc.Callable[Interface, Any], collections.abc.Callable[Interface, Awaitable[Any]]], *args, **kwargs) -> Any:
2031async def aexecute_coro(coro_worker: AnyWorker, *args, **kwargs) -> Any:
2032    return await coro_worker(current_interface(), *args, **kwargs)
async def aexec_coro( coro_worker: Union[ExplicitWorker, collections.abc.Callable[Interface, Any], collections.abc.Callable[Interface, Awaitable[Any]]], *args, **kwargs) -> Any:
2031async def aexecute_coro(coro_worker: AnyWorker, *args, **kwargs) -> Any:
2032    return await coro_worker(current_interface(), *args, **kwargs)
async def aecoro( coro_worker: Union[ExplicitWorker, collections.abc.Callable[Interface, Any], collections.abc.Callable[Interface, Awaitable[Any]]], *args, **kwargs) -> Any:
2031async def aexecute_coro(coro_worker: AnyWorker, *args, **kwargs) -> Any:
2032    return await coro_worker(current_interface(), *args, **kwargs)
@contextmanager
def around_await():
2039@contextmanager
2040def around_await():
2041    """
2042    It is very bad idea to await from inside the greenlet since it might lead to problems with stack
2043    state in an outside awaitable code and it's loop.
2044    Must be run around `await ...` from inside Coro or Service when running from inside external async loop
2045    :return:
2046    """
2047    loop = _current_coro_scheduler.scheduler
2048    try:
2049        yield
2050    finally:
2051        _current_coro_scheduler.scheduler = loop

It is very bad idea to await from inside the greenlet since it might lead to problems with stack state in an outside awaitable code and it's loop. Must be run around await ... from inside Coro or Service when running from inside external async loop :return:

class Request:
2054class Request:
2055    def __init__(self, coro: 'CoroWrapperBase', service_type: ServiceType, *args, **kwargs):
2056        self.coro = coro
2057        self.coro_id = coro.coro_id
2058        self.service_type = service_type
2059        self.args = args
2060        self.kwargs = kwargs
2061    
2062    def __repr__(self):
2063        return f'<{self.__class__.__name__}(coro: {self.coro}, coro_id: {self.coro_id}, service_type: {self.service_type}, args: {self.args}, kwargs: {self.kwargs})>'
Request( coro: CoroWrapperBase, service_type: type[Service], *args, **kwargs)
2055    def __init__(self, coro: 'CoroWrapperBase', service_type: ServiceType, *args, **kwargs):
2056        self.coro = coro
2057        self.coro_id = coro.coro_id
2058        self.service_type = service_type
2059        self.args = args
2060        self.kwargs = kwargs
coro
coro_id
service_type
args
kwargs
class Response:
2066class Response:
2067    def __init__(self, coro_id: CoroID, service_type: ServiceType, response: Any, exception: Optional[BaseException]=None):
2068        self.coro_id = coro_id
2069        self.service_type = service_type
2070        self.response = response
2071        self.exception = exception
2072
2073    def __call__(self) -> Any:
2074        if self.exception:
2075            raise self.exception
2076        return self.response
2077    
2078    def __repr__(self):
2079        return f'<{self.__class__.__name__}(coro_id: {self.coro_id}, service_type: {self.service_type}, response: {self.response}, exception: {self.exception})>'
Response( coro_id: int, service_type: type[Service], response: Any, exception: Union[BaseException, NoneType] = None)
2067    def __init__(self, coro_id: CoroID, service_type: ServiceType, response: Any, exception: Optional[BaseException]=None):
2068        self.coro_id = coro_id
2069        self.service_type = service_type
2070        self.response = response
2071        self.exception = exception
coro_id
service_type
response
exception
class DirectResponse(Response):
2082class DirectResponse(Response):
2083    pass
class Interface:
2680class Interface:
2681    def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase):
2682        self._loop: CoroSchedulerBase = loop                        # type: CoroSchedulerBase
2683        self._coro: CoroWrapperBase = coro                        # type: CoroWrapperBase
2684        self.coro_id: CoroID = coro.coro_id               # type: CoroID
2685        self.in_work: bool = False
2686        self.ignored_by_watchdog: bool = False
2687        self.logger: logging.Logger = self._loop.logger
2688        self.log: logging.Logger = self.logger
2689    
2690    @contextmanager
2691    def ignore_by_watchdog(self):
2692        current_state = self.ignored_by_watchdog
2693        self.ignored_by_watchdog = True
2694        try:
2695            yield
2696        finally:
2697            self.ignored_by_watchdog = current_state
2698    
2699    def _normalize_call_args_kwargs(self, service_type: Union[ServiceType, 'ServiceRequest'], args, kwargs) -> Tuple[Type['Service'], Tuple, Dict]:
2700        if inspect.isclass(service_type):
2701            if issubclass(service_type, Service):
2702                return service_type, args, kwargs
2703            elif issubclass(service_type, ServiceRequest):
2704                # return service_type, args, kwargs
2705                return self._normalize_call_args_kwargs(service_type()(*args, **kwargs))
2706            else:
2707                print(f'service_type of an unsupported type: {service_type}')
2708                raise TypeError(f'service_type of an unsupported type: {service_type}')
2709        elif isinstance(service_type, ServiceRequest):
2710            request: ServiceRequest = service_type
2711            service_type = request.default_service_type
2712            if service_type is None:
2713                print(f'Service request class {type(request)} have no default service assigned. Please provide service_type explicitly')
2714                raise RuntimeError(f'Service request class {type(request)} have no default service assigned. Please provide service_type explicitly')
2715            else:
2716                args, kwargs = args_kwargs(request, *args, **kwargs)
2717                return service_type, args, kwargs
2718        else:
2719            print(f'{service_type=}')
2720            raise ValueError(f'{service_type=}')
2721
2722    # def __call__(self, service_type: ServiceType, *args, **kwargs) -> Any:
2723    #     """
2724    #     Should be called from inside coroutines only.
2725    #     Will request some long running work to some service.
2726
2727    #     :param coro_id:
2728    #     :param service_type:
2729    #     :param args:
2730    #     :param kwargs:
2731    #     """
2732
2733    #     response = self.__put_task_method(self.__coro, service_type, *args, *kwargs)
2734    #     return response()
2735
2736    @overload
2737    def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2738
2739    @overload
2740    def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2741
2742    @overload
2743    def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2744
2745    @overload
2746    def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2747
2748    @overload
2749    def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2750
2751    @overload
2752    def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2753
2754    @overload
2755    def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ...
2756
2757    @overload
2758    def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2759
2760    def __call__(self, service_type, *args, **kwargs):
2761        raise NotImplementedError
2762
2763    # !!! Must not be present in interface since leads to incorrect greenlets switches
2764    # def put_coro(self, coro_type: CoroType, coro_worker: Worker, *args, **kwargs) -> CoroID:
2765    #     self.in_work = True
2766    #     coro = self._loop.put_coro(coro_type, coro_worker, *args, **kwargs)
2767    #     self.in_work = False
2768    #     return coro.coro_id
2769
2770    def destroy(self):
2771        self._loop = None
2772        self._coro = None
2773        self.coro_id = None
2774        self.in_work = None
Interface( loop: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerBase, coro: CoroWrapperBase)
2681    def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase):
2682        self._loop: CoroSchedulerBase = loop                        # type: CoroSchedulerBase
2683        self._coro: CoroWrapperBase = coro                        # type: CoroWrapperBase
2684        self.coro_id: CoroID = coro.coro_id               # type: CoroID
2685        self.in_work: bool = False
2686        self.ignored_by_watchdog: bool = False
2687        self.logger: logging.Logger = self._loop.logger
2688        self.log: logging.Logger = self.logger
coro_id: int
in_work: bool
ignored_by_watchdog: bool
logger: logging.Logger
log: logging.Logger
@contextmanager
def ignore_by_watchdog(self):
2690    @contextmanager
2691    def ignore_by_watchdog(self):
2692        current_state = self.ignored_by_watchdog
2693        self.ignored_by_watchdog = True
2694        try:
2695            yield
2696        finally:
2697            self.ignored_by_watchdog = current_state
def destroy(self):
2770    def destroy(self):
2771        self._loop = None
2772        self._coro = None
2773        self.coro_id = None
2774        self.in_work = None
class InterfaceGreenlet(Interface):
2777class InterfaceGreenlet(Interface):
2778    def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase):
2779        super(InterfaceGreenlet, self).__init__(loop, coro)
2780        self.result = None
2781
2782    @overload
2783    def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2784
2785    @overload
2786    def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2787
2788    @overload
2789    def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2790
2791    @overload
2792    def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2793
2794    @overload
2795    def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2796
2797    @overload
2798    def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2799
2800    @overload
2801    def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ...
2802
2803    @overload
2804    def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2805
2806    def __call__(self, service_type, *args, **kwargs):
2807        """
2808        Should be called from inside coroutines only.
2809        Will request some long running work to some service.
2810
2811        :param coro_id:
2812        :param service_type:
2813        :param args:
2814        :param kwargs:
2815        """
2816
2817        service_type, args, kwargs = self._normalize_call_args_kwargs(service_type, args, kwargs)
2818
2819        self.in_work = True
2820        # if __debug__: dlog(f'λ <= (request): <{func_info(self._coro.worker.worker, False)}>: {service_type}, {args}, {kwargs}')
2821        try:
2822            loop: CoroSchedulerGreenlet = self._loop
2823            response: Response = loop.root_coro.switch(Request(self._coro, service_type, *args, **kwargs))
2824            # TODO: we will switch to the next coro and do necessary preparations and reactions
2825            # if loop._coroutines_can_switch_directly:
2826            #     if loop._is_current_coro_was_new_born:  
2827            #         ...
2828            #     else:
2829            #         ...
2830            # else:
2831            #     response: Response = loop.root_coro.switch(Request(self._coro, service_type, *args, **kwargs))
2832        except AttributeError:
2833            # if __debug__: dlog(f'x λ: {id(self._loop)}, {self._loop}, root_coro: {self._loop.root_coro}; root_coro_iteration: {self._loop.root_coro_iteration}; current_loop: {self._loop.current_loop()}; current_interface: {self._loop.current_interface()}')
2834            raise
2835        # if __debug__: dlog(f'λ => (response): <{func_info(self._coro.worker.worker)}>: {repr(response)}')
2836        self.in_work = False
2837        if isinstance(response, Response):
2838            return response()
2839        
2840        dlog(f"ERROR:\n\tRESPONSE TYPE: {type(response)}\n\tRESPONSE REPR: {repr(response)}\n\tRESPONSE STR: {str(response)}")
2841        raise RuntimeError(f'Wrong type of response from the service: {type(response)}; {repr(response)}.')
2842    
2843    def register_result(self, result: Any):
2844        self.result = result
2845
2846    def destroy(self):
2847        self.result = None
2848        return super().destroy()
InterfaceGreenlet( loop: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerBase, coro: CoroWrapperBase)
2778    def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase):
2779        super(InterfaceGreenlet, self).__init__(loop, coro)
2780        self.result = None
result
def register_result(self, result: Any):
2843    def register_result(self, result: Any):
2844        self.result = result
def destroy(self):
2846    def destroy(self):
2847        self.result = None
2848        return super().destroy()
def find_coro_type( entity) -> CoroType:
2649def find_coro_type(entity) -> CoroType:
2650    if isinstance(entity, EntityArgsHolder):
2651        entity, args, kwargs = entity.entity_args_kwargs()
2652
2653    if inspect.iscoroutine(entity) or inspect.isgenerator(entity) or inspect.iscoroutinefunction(entity) or inspect.isgeneratorfunction(entity) or inspect.isasyncgen(entity) or inspect.isasyncgenfunction(entity) or inspect.isawaitable(entity):
2654        return CoroType.awaitable
2655    elif callable(entity):
2656        return CoroType.greenlet
2657    else:
2658        raise TypeError(f'{entity} is neither an awaitable nor a greenlet')
class InterfaceAsyncAwait(Interface):
2851class InterfaceAsyncAwait(Interface):
2852    def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase):
2853        super(InterfaceAsyncAwait, self).__init__(loop, coro)
2854
2855    @overload
2856    async def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2857
2858    @overload
2859    async def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2860
2861    @overload
2862    async def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2863
2864    @overload
2865    async def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2866
2867    @overload
2868    async def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2869
2870    @overload
2871    async def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2872
2873    @overload
2874    async def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ...
2875
2876    @overload
2877    async def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2878
2879    async def __call__(self, service_type, *args, **kwargs):
2880        """
2881        Should be called from inside coroutines only.
2882        Will request some long running work to some service.
2883
2884        :param coro_id:
2885        :param service_type:
2886        :param args:
2887        :param kwargs:
2888        """
2889
2890        service_type, args, kwargs = self._normalize_call_args_kwargs(service_type, args, kwargs)
2891
2892        self.in_work = True
2893        response = await yield_task_from_asyncawait(Request(self._coro, service_type, *args, **kwargs))
2894        self.in_work = False
2895        if isinstance(response, Response):
2896            return response()
2897        
2898        dlog(f"ERROR:\n\tRESPONSE TYPE: {type(response)}\n\tRESPONSE REPR: {repr(response)}\n\tRESPONSE STR: {str(response)}")
2899        raise RuntimeError('Wrong type of response from the service')
InterfaceAsyncAwait( loop: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerBase, coro: CoroWrapperBase)
2852    def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase):
2853        super(InterfaceAsyncAwait, self).__init__(loop, coro)
class InterfaceFake(Interface):
2902class InterfaceFake(Interface):
2903    def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase):
2904        self._loop: CoroSchedulerBase = None                        # type: CoroSchedulerBase
2905        self._coro: CoroWrapperBase = None                        # type: CoroWrapperBase
2906        self.coro_id: CoroID = None                      # type: CoroID
2907        self.in_work: bool = False
2908
2909    @overload
2910    def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2911
2912    @overload
2913    def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2914
2915    @overload
2916    def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2917
2918    @overload
2919    def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2920
2921    @overload
2922    def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2923
2924    @overload
2925    def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2926
2927    @overload
2928    def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ...
2929
2930    @overload
2931    def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2932
2933    def __call__(self, service_type, *args, **kwargs) -> Any:
2934        service_type, args, kwargs = self._normalize_call_args_kwargs(service_type, args, kwargs)
2935        return None
InterfaceFake( loop: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerBase, coro: CoroWrapperBase)
2903    def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase):
2904        self._loop: CoroSchedulerBase = None                        # type: CoroSchedulerBase
2905        self._coro: CoroWrapperBase = None                        # type: CoroWrapperBase
2906        self.coro_id: CoroID = None                      # type: CoroID
2907        self.in_work: bool = False
coro_id: int
in_work: bool
class InterfaceFakeAsyncAwait(Interface):
2938class InterfaceFakeAsyncAwait(Interface):
2939    def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase):
2940        self._loop: CoroSchedulerBase = None                        # type: CoroSchedulerBase
2941        self._coro: CoroWrapperBase = None                        # type: CoroWrapperBase
2942        self.coro_id: CoroID = None                      # type: CoroID
2943        self.in_work: bool = False
2944
2945    @overload
2946    async def __call__(self, service_request_type: Type['TypedServiceRequest[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2947
2948    @overload
2949    async def __call__(self, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2950
2951    @overload
2952    async def __call__(self, service_type: Type['TypedService[ServiceResponseTypeVar]'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2953
2954    @overload
2955    async def __call__(self, service_type: ServiceType, service_request: 'TypedServiceRequest[ServiceResponseTypeVar]') -> ServiceResponseTypeVar: ...
2956
2957    @overload
2958    async def __call__(self, service_request_type: Type['ServiceRequest'], *args, **kwargs) -> ServiceResponseTypeVar: ...
2959
2960    @overload
2961    async def __call__(self, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2962
2963    @overload
2964    async def __call__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ...
2965
2966    @overload
2967    async def __call__(self, service_type: ServiceType, service_request: 'ServiceRequest') -> ServiceResponseTypeVar: ...
2968
2969    async def __call__(self, service_type, *args, **kwargs):
2970        service_type, args, kwargs = self._normalize_call_args_kwargs(service_type, args, kwargs)
2971        return None
InterfaceFakeAsyncAwait( loop: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerBase, coro: CoroWrapperBase)
2939    def __init__(self, loop: CoroSchedulerBase, coro: CoroWrapperBase):
2940        self._loop: CoroSchedulerBase = None                        # type: CoroSchedulerBase
2941        self._coro: CoroWrapperBase = None                        # type: CoroWrapperBase
2942        self.coro_id: CoroID = None                      # type: CoroID
2943        self.in_work: bool = False
coro_id: int
in_work: bool
class CallerCoroInfo:
2974class CallerCoroInfo:
2975    def __init__(self, coro: CoroWrapperBase):
2976        self.coro: CoroWrapperBase = coro
2977        self.coro_id: CoroID = coro.coro_id
2978        self.coro_type: Type = type(coro)
CallerCoroInfo( coro: CoroWrapperBase)
2975    def __init__(self, coro: CoroWrapperBase):
2976        self.coro: CoroWrapperBase = coro
2977        self.coro_id: CoroID = coro.coro_id
2978        self.coro_type: Type = type(coro)
coro_id: int
coro_type: Type
class ServiceRequest:
2985class ServiceRequest:
2986    default_service_type: Optional[Type['Service']] = None
2987    default__request__type__: int = 0
2988
2989    def __init__(self):
2990        self.request_type: int = None  # type: Optional[int]
2991        self.args: Optional[Tuple] = None          # type: Optional[Tuple]
2992        self.kwargs: Optional[Dict] = None        # type: Optional[Dict]
2993        self.provide_to_request_handler: bool = False
2994
2995    def _save(self, __request__type__: int, *args, **kwargs) -> 'ServiceRequest':
2996        self.request_type = __request__type__
2997        self.args = args
2998        self.kwargs = kwargs
2999        return self
3000    
3001    def _copy(self) -> 'ServiceRequest':
3002        raise NotImplementedError
3003
3004    def _save_to_copy(self, __request__type__: int, *args, **kwargs) -> 'ServiceRequest':
3005        return self._copy()._save(__request__type__, *args, **kwargs)
3006    
3007    def __call__(self, *args: Any, **kwds: Any) -> 'ServiceRequest':
3008        """should call self._save() with some default __request__type__. Required for the Interface(Type[ServiceRequest], *args, **kwargs) call. Must return self
3009
3010        Returns:
3011            ServiceRequest: Must return self
3012        """
3013        return self._save(self.default__request__type__, *args, **kwds)
3014    
3015    def interface(self) -> Any:
3016        current_interface()(self.default_service_type, self)
3017    
3018    i = interface
3019    
3020    async def async_interface(self) -> Any:
3021        await current_interface()(self.default_service_type, self)
3022    
3023    ai = async_interface
3024    
3025    def __repr__(self):
3026        return f'<{self.__class__.__name__}(request_type: {self.request_type}, args: {self.args}, kwargs: {self.kwargs})>'
default_service_type: Union[type[Service], NoneType] = None
default__request__type__: int = 0
request_type: int
args: Union[Tuple, NoneType]
kwargs: Union[Dict, NoneType]
provide_to_request_handler: bool
def interface(self) -> Any:
3015    def interface(self) -> Any:
3016        current_interface()(self.default_service_type, self)
def i(self) -> Any:
3015    def interface(self) -> Any:
3016        current_interface()(self.default_service_type, self)
async def async_interface(self) -> Any:
3020    async def async_interface(self) -> Any:
3021        await current_interface()(self.default_service_type, self)
async def ai(self) -> Any:
3020    async def async_interface(self) -> Any:
3021        await current_interface()(self.default_service_type, self)
class TypedServiceRequest(ServiceRequest, typing.Generic[~ServiceResponseTypeVar]):
3029class TypedServiceRequest(ServiceRequest, Generic[ServiceResponseTypeVar]):
3030    pass

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

class ServiceRequestMethodMixin:
3033class ServiceRequestMethodMixin:
3034    def __init__(self, service: 'Service') -> None:
3035        self.service = service
3036
3037    def __call__(self, *args: Any, **kwds: Any) -> Any:
3038        raise NotImplementedError
3039
3040    def full_processing_iteration(self):
3041        raise NotImplementedError
3042
3043    def in_work(self) -> bool:
3044        raise NotImplementedError
3045
3046    def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool:
3047        raise NotImplementedError
ServiceRequestMethodMixin( service: Service)
3034    def __init__(self, service: 'Service') -> None:
3035        self.service = service
service
def full_processing_iteration(self):
3040    def full_processing_iteration(self):
3041        raise NotImplementedError
def in_work(self) -> bool:
3043    def in_work(self) -> bool:
3044        raise NotImplementedError
class DualImmediateProcessingServiceMixin:
3050class DualImmediateProcessingServiceMixin:
3051    def single_task_registration_or_immediate_processing(self, *args, **kwargs) -> Tuple[bool, Any, None]:
3052        if (len(args) == 1) and (len(kwargs) == 0) and (isinstance(args[0], ServiceRequest)) or ((len(args) == 0) and (len(kwargs) == 1) and ('request' in kwargs) and isinstance(kwargs['request'], ServiceRequest)):
3053            return self.single_task_registration_or_immediate_processing_multiple(*args, **kwargs)
3054        else:
3055            return self.single_task_registration_or_immediate_processing_single(*args, **kwargs)
3056
3057    def single_task_registration_or_immediate_processing_multiple(self, request: ServiceRequest
3058                                                         ) -> Tuple[bool, Any, None]:
3059        return self.resolve_request(request)
3060
3061    def single_task_registration_or_immediate_processing_single(
3062            self, *args, **kwargs
3063    ) -> Tuple[bool, Optional[CoroID], Any]:
3064        raise NotImplementedError
def single_task_registration_or_immediate_processing(self, *args, **kwargs) -> Tuple[bool, Any, NoneType]:
3051    def single_task_registration_or_immediate_processing(self, *args, **kwargs) -> Tuple[bool, Any, None]:
3052        if (len(args) == 1) and (len(kwargs) == 0) and (isinstance(args[0], ServiceRequest)) or ((len(args) == 0) and (len(kwargs) == 1) and ('request' in kwargs) and isinstance(kwargs['request'], ServiceRequest)):
3053            return self.single_task_registration_or_immediate_processing_multiple(*args, **kwargs)
3054        else:
3055            return self.single_task_registration_or_immediate_processing_single(*args, **kwargs)
def single_task_registration_or_immediate_processing_multiple( self, request: ServiceRequest) -> Tuple[bool, Any, NoneType]:
3057    def single_task_registration_or_immediate_processing_multiple(self, request: ServiceRequest
3058                                                         ) -> Tuple[bool, Any, None]:
3059        return self.resolve_request(request)
def single_task_registration_or_immediate_processing_single(self, *args, **kwargs) -> Tuple[bool, Union[int, NoneType], Any]:
3061    def single_task_registration_or_immediate_processing_single(
3062            self, *args, **kwargs
3063    ) -> Tuple[bool, Optional[CoroID], Any]:
3064        raise NotImplementedError
class WrongServiceRequestError(builtins.Exception):
2981class WrongServiceRequestError(Exception):
2982    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class Service(Iterable):
3073class Service(Iterable):
3074    def __init__(self, loop: CoroSchedulerBase):
3075        super(Service, self).__init__()
3076        self._loop: CoroSchedulerBase = loop                     # type: CoroSchedulerBase
3077        # self._requests = list()               # type: List[Request]
3078        self._responses: List[Response] = list()              # type: List[Response]
3079        self.current_caller_coro_info: Optional[CallerCoroInfo] = None  # type: Optional[CallerCoroInfo]
3080        self._request_workers = dict()        # type: Dict[int, Callable]
3081
3082    def iteration(self) -> Optional[List[Response]]:
3083        # requests = self._requests
3084        # self._requests = type(self._requests)()
3085        self._responses = list()
3086        # for request in requests:
3087        #     self.current_caller_coro_info = CallerCoroInfo(request.coro_id)
3088        #     result_exists, result = \
3089        #         self.single_task_registration_or_immediate_processing(*request.args, **request.kwargs)
3090        #     if result_exists:
3091        #         self.register_response(request.coro_id, result)
3092        # self.current_caller_coro_info = None
3093        try:
3094            self.full_processing_iteration()
3095        except:
3096            # if __debug__: dlog(sys.exc_info())
3097            raise
3098        return self._responses
3099
3100    def make_response(self, coro_id: CoroID, response: Any, exception: Optional[BaseException]=None):
3101        return Response(coro_id, type(self), response, exception)
3102
3103    def register_response(self, coro_id: CoroID, response: Any, exception: Optional[BaseException]=None):
3104        self._responses.append(self.make_response(coro_id, response, exception))
3105
3106    def put_task(self, request: Request) -> Optional[Response]:
3107        self.current_caller_coro_info = CallerCoroInfo(request.coro)
3108        result_exists, result, exception = \
3109            self.single_task_registration_or_immediate_processing(*request.args, **request.kwargs)
3110        self.current_caller_coro_info = None
3111        if result_exists or exception:
3112            return self.make_response(request.coro_id, result, exception)
3113        return None
3114
3115    def resolve_request(self, request: ServiceRequest):
3116        try:
3117            if request.provide_to_request_handler:
3118                return self._request_workers[request.request_type](request, *request.args, **request.kwargs)
3119            else:
3120                return self._request_workers[request.request_type](*request.args, **request.kwargs)
3121        except:
3122            return True, None, get_exception()
3123
3124    def try_resolve_request(self, *args, **kwargs):
3125        possible_request: Optional[ServiceRequest] = None
3126        if (len(args) == 1) and (len(kwargs) == 0):
3127            possible_request = args[0]
3128        elif (len(kwargs) == 1) and (len(args) == 0):
3129            possible_request = kwargs.pop('request', None)
3130
3131        if possible_request is not None:
3132            if isinstance(possible_request, ServiceRequest):
3133                return self.resolve_request(possible_request)
3134
3135        return None
3136
3137    def single_task_registration_or_immediate_processing(self, *args, **kwargs) -> ServiceProcessingResponse:
3138        raise NotImplementedError
3139
3140    def full_processing_iteration(self):
3141        raise NotImplementedError
3142
3143    def in_work(self) -> bool:
3144        """Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
3145
3146        Raises:
3147            NotImplementedError: _description_
3148
3149        Returns:
3150            bool: _description_
3151        """        
3152        raise NotImplementedError
3153    
3154    def in_forground_work(self) -> bool:
3155        return True
3156
3157    def thrifty_in_work(self, result: bool) -> bool:
3158        if result:
3159            return True
3160        else:
3161            self.make_dead()
3162            return False
3163    
3164    def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]:
3165        return False, None
3166    
3167    def is_low_latency(self) -> bool:
3168        return False
3169    
3170    def make_live(self):
3171        self._loop.make_service_live_fast(type(self), self.is_low_latency())
3172    
3173    def make_dead(self):
3174        self._loop.make_service_dead_fast(type(self))
3175    
3176    @staticmethod
3177    def service_id_impl():
3178        return None
3179    
3180    @staticmethod
3181    def service_id(service_type: Type):
3182        service_id = service_type.service_id_impl()
3183        if service_id is None:
3184            service_id = service_type.__name__
3185        
3186        return service_id
3187    
3188    def destroy(self):
3189        pass
Service( loop: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerBase)
3074    def __init__(self, loop: CoroSchedulerBase):
3075        super(Service, self).__init__()
3076        self._loop: CoroSchedulerBase = loop                     # type: CoroSchedulerBase
3077        # self._requests = list()               # type: List[Request]
3078        self._responses: List[Response] = list()              # type: List[Response]
3079        self.current_caller_coro_info: Optional[CallerCoroInfo] = None  # type: Optional[CallerCoroInfo]
3080        self._request_workers = dict()        # type: Dict[int, Callable]
current_caller_coro_info: Union[CallerCoroInfo, NoneType]
def iteration( self) -> Union[List[Response], NoneType]:
3082    def iteration(self) -> Optional[List[Response]]:
3083        # requests = self._requests
3084        # self._requests = type(self._requests)()
3085        self._responses = list()
3086        # for request in requests:
3087        #     self.current_caller_coro_info = CallerCoroInfo(request.coro_id)
3088        #     result_exists, result = \
3089        #         self.single_task_registration_or_immediate_processing(*request.args, **request.kwargs)
3090        #     if result_exists:
3091        #         self.register_response(request.coro_id, result)
3092        # self.current_caller_coro_info = None
3093        try:
3094            self.full_processing_iteration()
3095        except:
3096            # if __debug__: dlog(sys.exc_info())
3097            raise
3098        return self._responses

should return False if ready to stop looping :return:

def make_response( self, coro_id: int, response: Any, exception: Union[BaseException, NoneType] = None):
3100    def make_response(self, coro_id: CoroID, response: Any, exception: Optional[BaseException]=None):
3101        return Response(coro_id, type(self), response, exception)
def register_response( self, coro_id: int, response: Any, exception: Union[BaseException, NoneType] = None):
3103    def register_response(self, coro_id: CoroID, response: Any, exception: Optional[BaseException]=None):
3104        self._responses.append(self.make_response(coro_id, response, exception))
def put_task( self, request: Request) -> Union[Response, NoneType]:
3106    def put_task(self, request: Request) -> Optional[Response]:
3107        self.current_caller_coro_info = CallerCoroInfo(request.coro)
3108        result_exists, result, exception = \
3109            self.single_task_registration_or_immediate_processing(*request.args, **request.kwargs)
3110        self.current_caller_coro_info = None
3111        if result_exists or exception:
3112            return self.make_response(request.coro_id, result, exception)
3113        return None
def resolve_request( self, request: ServiceRequest):
3115    def resolve_request(self, request: ServiceRequest):
3116        try:
3117            if request.provide_to_request_handler:
3118                return self._request_workers[request.request_type](request, *request.args, **request.kwargs)
3119            else:
3120                return self._request_workers[request.request_type](*request.args, **request.kwargs)
3121        except:
3122            return True, None, get_exception()
def try_resolve_request(self, *args, **kwargs):
3124    def try_resolve_request(self, *args, **kwargs):
3125        possible_request: Optional[ServiceRequest] = None
3126        if (len(args) == 1) and (len(kwargs) == 0):
3127            possible_request = args[0]
3128        elif (len(kwargs) == 1) and (len(args) == 0):
3129            possible_request = kwargs.pop('request', None)
3130
3131        if possible_request is not None:
3132            if isinstance(possible_request, ServiceRequest):
3133                return self.resolve_request(possible_request)
3134
3135        return None
def single_task_registration_or_immediate_processing( self, *args, **kwargs) -> Tuple[bool, Any, Union[BaseException, NoneType]]:
3137    def single_task_registration_or_immediate_processing(self, *args, **kwargs) -> ServiceProcessingResponse:
3138        raise NotImplementedError
def full_processing_iteration(self):
3140    def full_processing_iteration(self):
3141        raise NotImplementedError
def in_work(self) -> bool:
3143    def in_work(self) -> bool:
3144        """Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
3145
3146        Raises:
3147            NotImplementedError: _description_
3148
3149        Returns:
3150            bool: _description_
3151        """        
3152        raise NotImplementedError

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

def in_forground_work(self) -> bool:
3154    def in_forground_work(self) -> bool:
3155        return True
def thrifty_in_work(self, result: bool) -> bool:
3157    def thrifty_in_work(self, result: bool) -> bool:
3158        if result:
3159            return True
3160        else:
3161            self.make_dead()
3162            return False
def time_left_before_next_event(self) -> Tuple[bool, Union[int, float, NoneType]]:
3164    def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]:
3165        return False, None
def is_low_latency(self) -> bool:
3167    def is_low_latency(self) -> bool:
3168        return False
def make_live(self):
3170    def make_live(self):
3171        self._loop.make_service_live_fast(type(self), self.is_low_latency())
def make_dead(self):
3173    def make_dead(self):
3174        self._loop.make_service_dead_fast(type(self))
@staticmethod
def service_id_impl():
3176    @staticmethod
3177    def service_id_impl():
3178        return None
@staticmethod
def service_id(service_type: Type):
3180    @staticmethod
3181    def service_id(service_type: Type):
3182        service_id = service_type.service_id_impl()
3183        if service_id is None:
3184            service_id = service_type.__name__
3185        
3186        return service_id
def destroy(self):
3188    def destroy(self):
3189        pass
class TypedService(Service, typing.Generic[~ServiceResponseTypeVar]):
3192class TypedService(Service, Generic[ServiceResponseTypeVar]):
3193    pass

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

class CoroWrapperBase:
2092class CoroWrapperBase:
2093    def __init__(self, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs):
2094        if isinstance(worker, EntityArgsHolder):
2095            worker, args, kwargs = worker.entity_args_kwargs()
2096        
2097        self.worker = worker               # type: Worker
2098        self.init_args = args              # type: Tuple[Any, ...]
2099        self.init_kwargs = kwargs          # type: Dict
2100        self.coro_id: CoroID = coro_id             # type: CoroID
2101        self.loop: CoroSchedulerBase = loop                   # type: CoroSchedulerBase
2102        self.coro = None                   # type: Optional[Coro]
2103        self.parent_coro = None            # type: Optional[Coro]
2104        self.interface = None              # type: Optional[Interface]
2105        self.last_result = None            # type: Optional[Union[Request, Any]] # return value can be any
2106        self.exception = None              # type: Optional[Exception] # If there was an exception
2107        self.coro_on_del_handlers = set()  # type: Set[Callable]
2108        self._make_coro_method = self._raise_not_implemented_error  # type: Callable
2109        self._make_interface = self._raise_not_implemented_error  # type: Callable
2110        self._init_method = self._raise_not_implemented_error  # type: Callable
2111        self._call_method = self._raise_not_implemented_error  # type: Callable
2112        self._throw_method = self._raise_not_implemented_error  # type: Callable
2113        self._close_method = self._raise_not_implemented_error  # type: Callable
2114        self._current_call_method = self._call_method  # type: Callable
2115        self._is_background_coro: bool = False
2116        self.loop.foreground_coro_num += 1
2117    
2118    def _travers_through_coro_on_del_handlers(self, coro_exit_status: 'CoroExitStatus'):
2119        if not self.coro_on_del_handlers:
2120            return coro_exit_status
2121
2122        exception_properly_handled = coro_exit_status.properly_handled
2123        for handler in self.coro_on_del_handlers:
2124            exception_properly_handled = handler(self) or exception_properly_handled
2125        coro_exit_status.properly_handled = exception_properly_handled
2126        return coro_exit_status
2127
2128    def init(self, parent_coro: Optional[Coro] = None) -> Union[None, 'CoroExitStatus']:
2129        self.parent_coro = parent_coro
2130        self.coro = self._make_coro_method()
2131        self.interface = self._make_interface()
2132        current_coro_interface_buff = self.loop.current_coro_interface
2133        self.loop.current_coro_interface = self.interface
2134        try:
2135            self._init_method(self.init_args, self.init_kwargs)
2136            if self:
2137                return None
2138            else:
2139                # if __debug__: dlog(f'LAST_RESULT. Type: {type(self.last_result)}; Value: {self.last_result}')
2140                return self._travers_through_coro_on_del_handlers(CoroExitStatus(None, True))
2141        except:
2142            self.exception = get_exception()
2143            if not self.coro_on_del_handlers:
2144                return CoroExitStatus(self.exception, False)
2145            return self._travers_through_coro_on_del_handlers(CoroExitStatus(self.exception, False))
2146        finally:
2147            self.loop.current_coro_interface = current_coro_interface_buff
2148
2149    def __call__(self, *args, **kwargs) -> Union[None, 'CoroExitStatus']:
2150        current_coro_interface_buff = self.loop.current_coro_interface
2151        self.loop.current_coro_interface = self.interface
2152        try:
2153            self._current_call_method(*args, **kwargs)
2154            if self:
2155                return None
2156            else:
2157                # if __debug__: dlog(f'LAST_RESULT. Type: {type(self.last_result)}; Value: {self.last_result}')
2158                return self._travers_through_coro_on_del_handlers(CoroExitStatus(None, True))
2159        except:
2160            self.exception = get_exception()
2161            if not self.coro_on_del_handlers:
2162                return CoroExitStatus(self.exception, False)
2163            return self._travers_through_coro_on_del_handlers(CoroExitStatus(self.exception, False))
2164        finally:
2165            self.loop.current_coro_interface = current_coro_interface_buff
2166    
2167    def __bool__(self) -> bool:
2168        raise NotImplementedError
2169
2170    def add_on_coro_del_handler(self, callback: OnCoroDelHandler):
2171        self.coro_on_del_handlers.add(callback)
2172    
2173    def _raise_not_implemented_error(self, *args, **kwargs):
2174        pass
2175        raise NotImplementedError
2176        return self._raise_not_implemented_error  # Suppressing lint error
2177    
2178    def _current_throw_method_helper(self, *args, **kwargs):
2179        self._throw_method(*args, **kwargs)
2180        self._current_call_method = self._call_method
2181    
2182    def request_throw(self) -> Any:
2183        self._current_call_method = self._current_throw_method_helper
2184    
2185    def _current_close_method_helper(self, *args, **kwargs):
2186        self._close_method()
2187        self._current_call_method = self._call_method
2188    
2189    def request_close(self) -> Any:
2190        self._current_call_method = self._current_close_method_helper
2191    
2192    @property
2193    def is_background_coro(self):
2194        return self._is_background_coro
2195    
2196    @is_background_coro.setter
2197    def is_background_coro(self, value: bool):
2198        if value:
2199            if not self._is_background_coro:
2200                self.loop.foreground_coro_num -= 1
2201        else:
2202            if self._is_background_coro:
2203                self.loop.foreground_coro_num += 1
2204        
2205        self._is_background_coro = value
2206
2207    def destroy(self):
2208        if self.interface is not None:
2209            if isinstance(self.interface, Interface):
2210                self.interface.destroy()
2211        
2212        if not self.is_background_coro:
2213            self.loop.foreground_coro_num -= 1
2214        
2215        self._is_background_coro = None
2216        self.init_args = None
2217        self.init_kwargs = None
2218        self.coro_id = None
2219        self.worker = None
2220        self.loop = None
2221        self.coro = None
2222        self.parent_coro = None
2223        self.interface = None
2224        self.last_result = None
2225        self.exception = None
2226        self.coro_on_del_handlers = None
2227        self._make_coro_method = None
2228        self._make_interface = None
2229        self._init_method = None
2230        self._call_method = None
2231        self._throw_method = None
2232        self._close_method = None
2233        self._current_call_method = None
CoroWrapperBase( loop: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerBase, coro_id: int, worker: Union[collections.abc.Callable[Interface, Any], collections.abc.Callable[Interface, Awaitable[Any]]], *args, **kwargs)
2093    def __init__(self, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs):
2094        if isinstance(worker, EntityArgsHolder):
2095            worker, args, kwargs = worker.entity_args_kwargs()
2096        
2097        self.worker = worker               # type: Worker
2098        self.init_args = args              # type: Tuple[Any, ...]
2099        self.init_kwargs = kwargs          # type: Dict
2100        self.coro_id: CoroID = coro_id             # type: CoroID
2101        self.loop: CoroSchedulerBase = loop                   # type: CoroSchedulerBase
2102        self.coro = None                   # type: Optional[Coro]
2103        self.parent_coro = None            # type: Optional[Coro]
2104        self.interface = None              # type: Optional[Interface]
2105        self.last_result = None            # type: Optional[Union[Request, Any]] # return value can be any
2106        self.exception = None              # type: Optional[Exception] # If there was an exception
2107        self.coro_on_del_handlers = set()  # type: Set[Callable]
2108        self._make_coro_method = self._raise_not_implemented_error  # type: Callable
2109        self._make_interface = self._raise_not_implemented_error  # type: Callable
2110        self._init_method = self._raise_not_implemented_error  # type: Callable
2111        self._call_method = self._raise_not_implemented_error  # type: Callable
2112        self._throw_method = self._raise_not_implemented_error  # type: Callable
2113        self._close_method = self._raise_not_implemented_error  # type: Callable
2114        self._current_call_method = self._call_method  # type: Callable
2115        self._is_background_coro: bool = False
2116        self.loop.foreground_coro_num += 1
worker
init_args
init_kwargs
coro_id: int
loop: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerBase
coro
parent_coro
interface
last_result
exception
coro_on_del_handlers
def init( self, parent_coro: Union[greenlet.greenlet, Awaitable, Coroutine, Generator, AsyncGenerator, Callable, NoneType] = None) -> Union[NoneType, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroExitStatus]:
2128    def init(self, parent_coro: Optional[Coro] = None) -> Union[None, 'CoroExitStatus']:
2129        self.parent_coro = parent_coro
2130        self.coro = self._make_coro_method()
2131        self.interface = self._make_interface()
2132        current_coro_interface_buff = self.loop.current_coro_interface
2133        self.loop.current_coro_interface = self.interface
2134        try:
2135            self._init_method(self.init_args, self.init_kwargs)
2136            if self:
2137                return None
2138            else:
2139                # if __debug__: dlog(f'LAST_RESULT. Type: {type(self.last_result)}; Value: {self.last_result}')
2140                return self._travers_through_coro_on_del_handlers(CoroExitStatus(None, True))
2141        except:
2142            self.exception = get_exception()
2143            if not self.coro_on_del_handlers:
2144                return CoroExitStatus(self.exception, False)
2145            return self._travers_through_coro_on_del_handlers(CoroExitStatus(self.exception, False))
2146        finally:
2147            self.loop.current_coro_interface = current_coro_interface_buff
def add_on_coro_del_handler( self, callback: collections.abc.Callable[CoroWrapperBase, bool]):
2170    def add_on_coro_del_handler(self, callback: OnCoroDelHandler):
2171        self.coro_on_del_handlers.add(callback)
def request_throw(self) -> Any:
2182    def request_throw(self) -> Any:
2183        self._current_call_method = self._current_throw_method_helper
def request_close(self) -> Any:
2189    def request_close(self) -> Any:
2190        self._current_call_method = self._current_close_method_helper
is_background_coro
2192    @property
2193    def is_background_coro(self):
2194        return self._is_background_coro
def destroy(self):
2207    def destroy(self):
2208        if self.interface is not None:
2209            if isinstance(self.interface, Interface):
2210                self.interface.destroy()
2211        
2212        if not self.is_background_coro:
2213            self.loop.foreground_coro_num -= 1
2214        
2215        self._is_background_coro = None
2216        self.init_args = None
2217        self.init_kwargs = None
2218        self.coro_id = None
2219        self.worker = None
2220        self.loop = None
2221        self.coro = None
2222        self.parent_coro = None
2223        self.interface = None
2224        self.last_result = None
2225        self.exception = None
2226        self.coro_on_del_handlers = None
2227        self._make_coro_method = None
2228        self._make_interface = None
2229        self._init_method = None
2230        self._call_method = None
2231        self._throw_method = None
2232        self._close_method = None
2233        self._current_call_method = None
class CoroWrapperGreenlet(CoroWrapperBase):
2236class CoroWrapperGreenlet(CoroWrapperBase):
2237    def __init__(self, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs):
2238        super(CoroWrapperGreenlet, self).__init__(loop, coro_id, worker, *args, **kwargs)
2239        self._make_coro_method = self._make_coro_method_imp
2240        self._make_interface = self._make_interface_imp
2241        self._init_method = self._init_method_imp
2242        self._call_method = self._call_method_imp
2243        self._throw_method = self._throw_method_imp  # type: Callable
2244        self._close_method = self._close_method_imp  # type: Callable
2245        self._current_call_method = self._call_method  # type: Callable
2246    
2247    def _make_coro_method_imp(self):
2248        return greenlet(self.worker, self.parent_coro)
2249    
2250    def _make_interface_imp(self):
2251        return InterfaceGreenlet(self.loop, self)
2252
2253    def _init_method_imp(self, init_args, init_kwargs):
2254        try:
2255            # if __debug__: dlog(f'λ => (init): {func_info(self.worker.worker)}')
2256            self.last_result = self.coro.switch(self.interface, *init_args, **init_kwargs)  # TODO: wrong
2257            # if __debug__: dlog(f'λ <= (init): {repr(self.worker.worker)}')
2258        except GreenletExit:
2259            # if __debug__: dlog(f'λ <= (init; GreenletExit): {repr(self.worker.worker)}')
2260            self.last_result = self.interface.result
2261        except:
2262            # if __debug__: dlog(f'λ <= (init; Exception): {repr(self.worker.worker)}')
2263            self.last_result = None
2264            raise
2265
2266    def _call_method_imp(self, *args, **kwargs):
2267        try:
2268            # if __debug__: dlog(f'λ => (call): {func_info(self.worker.worker)}')
2269            self.last_result = self.coro.switch(*args, **kwargs)  # TODO: wrong
2270            # if __debug__: dlog(f'λ <= (call): {repr(self.worker.worker)}')
2271        except GreenletExit:
2272            # if __debug__: dlog(f'λ <= (init; GreenletExit): {repr(self.worker.worker)}')
2273            self.last_result = self.interface.result
2274        except:
2275            # if __debug__: dlog(f'λ <= (init; Exception): {repr(self.worker.worker)}')
2276            self.last_result = None
2277            raise
2278
2279    # def _throw_method_imp(self, *args, **kwargs):
2280    def _throw_method_imp(self, ex_type, ex_value=None, ex_traceback=None):
2281        try:
2282            # if __debug__: dlog(f'λ => (throw): {func_info(self.worker.worker)}')
2283            self.last_result = self.coro.throw(ex_type, ex_value, ex_traceback)
2284            # if __debug__: dlog(f'λ <= (throw): {repr(self.worker.worker)}')
2285        except GreenletExit:
2286            # if __debug__: dlog(f'λ <= (init; GreenletExit): {repr(self.worker.worker)}')
2287            self.last_result = self.interface.result
2288        except:
2289            # if __debug__: dlog(f'λ <= (init; Exception): {repr(self.worker.worker)}')
2290            self.last_result = None
2291            raise
2292
2293    def _close_method_imp(self):
2294        try:
2295            # if __debug__: dlog(f'λ => (close): {func_info(self.worker.worker)}')
2296            self.last_result = self.coro.throw()
2297            # if __debug__: dlog(f'λ <= (close): {repr(self.worker.worker)}')
2298        except GreenletExit:
2299            # if __debug__: dlog(f'λ <= (init; GreenletExit): {repr(self.worker.worker)}')
2300            self.last_result = self.interface.result
2301        except:
2302            # if __debug__: dlog(f'λ <= (init; Exception): {repr(self.worker.worker)}')
2303            self.last_result = None
2304            raise
2305    
2306    def __bool__(self) -> bool:
2307        return bool(self.coro)
2308
2309    def destroy(self):
2310        if isinstance(self.worker, GreenletWorkerWrapper):
2311            self.worker.destroy()
2312        
2313        return super().destroy()
CoroWrapperGreenlet( loop: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerBase, coro_id: int, worker: Union[collections.abc.Callable[Interface, Any], collections.abc.Callable[Interface, Awaitable[Any]]], *args, **kwargs)
2237    def __init__(self, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs):
2238        super(CoroWrapperGreenlet, self).__init__(loop, coro_id, worker, *args, **kwargs)
2239        self._make_coro_method = self._make_coro_method_imp
2240        self._make_interface = self._make_interface_imp
2241        self._init_method = self._init_method_imp
2242        self._call_method = self._call_method_imp
2243        self._throw_method = self._throw_method_imp  # type: Callable
2244        self._close_method = self._close_method_imp  # type: Callable
2245        self._current_call_method = self._call_method  # type: Callable
def destroy(self):
2309    def destroy(self):
2310        if isinstance(self.worker, GreenletWorkerWrapper):
2311            self.worker.destroy()
2312        
2313        return super().destroy()
class CoroWrapperAsyncAwait(CoroWrapperBase):
2370class CoroWrapperAsyncAwait(CoroWrapperBase):
2371    def __init__(self, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs):
2372        super(CoroWrapperAsyncAwait, self).__init__(loop, coro_id, worker, *args, **kwargs)
2373        self._make_coro_method = self._make_coro_method_imp
2374        self._make_interface = self._make_interface_imp
2375        self.in_run_state = False
2376        self.subtype = self._setup_subtype()
2377        self._current_call_method = self._call_method  # type: Callable
2378
2379    def _make_coro_method_imp(self):
2380        return self.worker
2381    
2382    def _make_interface_imp(self):
2383        return InterfaceAsyncAwait(self.loop, self)
2384
2385    def __bool__(self) -> bool:
2386        return bool(self.in_run_state)
2387    
2388    def _setup_subtype(self):
2389        if inspect.iscoroutine(self.worker):
2390            # if __debug__: dlog('λ >>>\tCOROUTINE')
2391            self._init_method = self._init_coroutine
2392            self._call_method = self._call_coroutine
2393            self._throw_method = self._throw_coroutine
2394            self._close_method = self._close_coroutine
2395            return 0
2396        elif inspect.isgenerator(self.worker):
2397            # if __debug__: dlog('λ >>>\tGENERATOR')
2398            self._init_method = self._init_generator
2399            self._call_method = self._call_coroutine
2400            self._throw_method = self._throw_coroutine
2401            self._close_method = self._close_coroutine
2402            return 1
2403        elif inspect.iscoroutinefunction(self.worker):
2404            # if __debug__: dlog('λ >>>\tCOROUTINE FUNCTION')
2405            self._init_method = self._init_coroutinefunction
2406            self._call_method = self._call_coroutine
2407            self._throw_method = self._throw_coroutine
2408            self._close_method = self._close_coroutine
2409            return 2
2410        elif inspect.isgeneratorfunction(self.worker):
2411            # if __debug__: dlog('λ >>>\tGENERATOR FUNCTION')
2412            self._init_method = self._init_generatorfunction
2413            self._call_method = self._call_coroutine
2414            self._throw_method = self._throw_coroutine
2415            self._close_method = self._close_coroutine
2416            return 3
2417        elif inspect.isasyncgen(self.worker):
2418            # if __debug__: dlog('λ >>>\tASYNC GENERATOR')
2419            self._init_method = self._init_asyncgenerator
2420            self._call_method = self._call_asyncgenerator
2421            self._throw_method = self._throw_asyncgenerator
2422            self._close_method = self._close_asyncgenerator
2423            return 4
2424        elif inspect.isasyncgenfunction(self.worker):
2425            # if __debug__: dlog('λ >>>\tASYNC GENERATOR FUNCTION')
2426            self._init_method = self._init_asyncgeneratorfunction
2427            self._call_method = self._call_asyncgenerator
2428            self._throw_method = self._throw_asyncgenerator
2429            self._close_method = self._close_asyncgenerator
2430            return 5
2431        elif inspect.isawaitable(self.worker):
2432            # if __debug__: dlog('λ >>>\tAWAITABLE')
2433            self._init_method = self._init_awaitable
2434            self._call_method = self._call_coroutine
2435            self._throw_method = self._throw_coroutine
2436            self._close_method = self._close_coroutine
2437            return 6
2438        elif callable(self.worker):
2439            # if __debug__: dlog('λ >>>\tCALLABLE')
2440            self._init_method = self._init_callable
2441            self._call_method = self._call_coroutine
2442            self._throw_method = self._throw_coroutine
2443            self._close_method = self._close_coroutine
2444            return 7
2445        else:
2446            raise TypeError(f'{self.worker} is neither an awaitable nor a wrapper for an awaitable')
2447    
2448    def _init_coroutine(self, init_args, init_kwargs):
2449        try:
2450            self.in_run_state = True
2451            self.last_result = self.coro.send(None)
2452        except StopIteration as ex:
2453            self.in_run_state = False
2454            self.last_result = ex.value
2455        except:
2456            self.in_run_state = False
2457            raise
2458            
2459    def _call_coroutine(self, *args, **kwargs):
2460        try:
2461            self.in_run_state = True
2462            self.last_result = self.coro.send(*args, **kwargs)
2463        except StopIteration as ex:
2464            self.in_run_state = False
2465            self.last_result = ex.value
2466        except:
2467            self.in_run_state = False
2468            raise
2469    
2470    if sys.version_info >= (3, 12):
2471    # if (3, 12) <= PYTHON_VERSION_INT:
2472        def _throw_coroutine(self, ex_type, ex_value=None, ex_traceback=None):
2473            try:
2474                self.in_run_state = True
2475                if ex_value is None:
2476                    ex_value = ex_type()
2477                
2478                self.last_result = self.coro.throw(ex_value)  # Changed in version 3.12: The second signature (type[, value[, traceback]]) is deprecated and may be removed in a future version of Python.
2479            except StopIteration as ex:
2480                self.in_run_state = False
2481                self.last_result = ex.value
2482            except:
2483                self.in_run_state = False
2484                raise
2485    else:
2486        def _throw_coroutine(self, ex_type, ex_value=None, ex_traceback=None):
2487            try:
2488                self.in_run_state = True
2489                self.last_result = self.coro.throw(ex_type, ex_value, ex_traceback)  # Changed in version 3.12: The second signature (type[, value[, traceback]]) is deprecated and may be removed in a future version of Python.
2490            except StopIteration as ex:
2491                self.in_run_state = False
2492                self.last_result = ex.value
2493            except:
2494                self.in_run_state = False
2495                raise
2496            
2497    def _close_coroutine(self, *args, **kwargs):
2498        try:
2499            self.in_run_state = True
2500            self.last_result = self.coro.close()
2501            self.in_run_state = False
2502        except GeneratorExit as ex:
2503            self.in_run_state = False
2504        except:
2505            self.in_run_state = False
2506            raise
2507
2508    def _init_generator(self, init_args, init_kwargs):
2509        try:
2510            self.in_run_state = True
2511            self.last_result = next(self.coro)  # ToDo: investigate how to provide an initial parameters
2512        except StopIteration as ex:
2513            self.in_run_state = False
2514            self.last_result = ex.value
2515        except:
2516            self.in_run_state = False
2517            raise
2518
2519    def _init_coroutinefunction(self, init_args, init_kwargs):
2520        self.coro = self.coro(self.interface, *init_args, **init_kwargs)
2521        self._init_coroutine(None, None)
2522
2523    def _init_generatorfunction(self, init_args, init_kwargs):
2524        self.coro = self.coro(self.interface, *init_args, **init_kwargs)
2525        self._init_generator(None, None)
2526
2527    def _init_asyncgenerator(self, init_args, init_kwargs):
2528        try:
2529            self.in_run_state = True
2530            entity = init_asyncgenerator(self.coro)
2531            result = entity.send(None)
2532        except StopIteration as ex:
2533            self.in_run_state = False
2534            result = ex.value
2535        except:
2536            self.in_run_state = False
2537            raise
2538        
2539        self.last_result, exception = result
2540        if exception is not None:
2541            self.in_run_state = False
2542            if not isinstance(exception, StopAsyncIteration):
2543                raise exception
2544
2545    def _call_asyncgenerator(self, *args, **kwargs):
2546        try:
2547            self.in_run_state = True
2548            entity = call_asyncgenerator(self.coro, *args, **kwargs)
2549            result = entity.send(None)
2550        except StopIteration as ex:
2551            self.in_run_state = False
2552            result = ex.value
2553        except:
2554            self.in_run_state = False
2555            raise
2556        
2557        self.last_result, exception = result
2558        if exception is not None:
2559            self.in_run_state = False
2560            if not isinstance(exception, StopAsyncIteration):
2561                raise exception
2562
2563    def _throw_asyncgenerator(self, *args, **kwargs):
2564        try:
2565            self.in_run_state = True
2566            entity = throw_asyncgenerator(self.coro, *args, **kwargs)
2567            result = entity.send(None)
2568        except StopIteration as ex:
2569            self.in_run_state = False
2570            result = ex.value
2571        except:
2572            self.in_run_state = False
2573            raise
2574        
2575        self.last_result, exception = result
2576        if exception is not None:
2577            self.in_run_state = False
2578            if not isinstance(exception, StopAsyncIteration):
2579                raise exception
2580
2581    def _close_asyncgenerator(self):
2582        try:
2583            self.in_run_state = True
2584            entity = close_asyncgenerator(self.coro)
2585            result = entity.send(None)
2586        except StopIteration as ex:  # TODO: check maybe `GeneratorExit` will be raised
2587            self.in_run_state = False
2588            result = ex.value
2589        except:
2590            self.in_run_state = False
2591            raise
2592        
2593        self.last_result, exception = result
2594        if exception is not None:
2595            self.in_run_state = False
2596            if not isinstance(exception, StopAsyncIteration):
2597                raise exception
2598
2599    def _init_asyncgeneratorfunction(self, init_args, init_kwargs):
2600        try:
2601            self.in_run_state = True
2602            entity = init_asyncgeneratorfunction(self.coro, self.interface, *init_args, **init_kwargs)
2603            result = entity.send(None)
2604        except StopIteration as ex:
2605            self.in_run_state = False
2606            result = ex.value
2607        except:
2608            self.in_run_state = False
2609            raise
2610        
2611        self.coro, self.last_result, exception = result
2612        if exception is not None:
2613            self.in_run_state = False
2614            if not isinstance(exception, StopAsyncIteration):
2615                raise exception
2616
2617    def _init_awaitable(self, init_args, init_kwargs):
2618        self.coro = awaitable_wrapper(self.coro)
2619        self._init_coroutine()
2620
2621    def _init_callable(self, init_args, init_kwargs):
2622        self.coro = callable_wrapper(self.coro, self.interface, *init_args, **init_kwargs)
2623        self._init_coroutine()
2624
2625    def destroy(self):
2626        self._make_coro_method = None
2627        self._make_interface = None
2628        self.in_run_state = None
2629        self.subtype = None
2630        return super().destroy()
CoroWrapperAsyncAwait( loop: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerBase, coro_id: int, worker: Union[collections.abc.Callable[Interface, Any], collections.abc.Callable[Interface, Awaitable[Any]]], *args, **kwargs)
2371    def __init__(self, loop: CoroSchedulerBase, coro_id: CoroID, worker: Worker, *args, **kwargs):
2372        super(CoroWrapperAsyncAwait, self).__init__(loop, coro_id, worker, *args, **kwargs)
2373        self._make_coro_method = self._make_coro_method_imp
2374        self._make_interface = self._make_interface_imp
2375        self.in_run_state = False
2376        self.subtype = self._setup_subtype()
2377        self._current_call_method = self._call_method  # type: Callable
in_run_state
subtype
def destroy(self):
2625    def destroy(self):
2626        self._make_coro_method = None
2627        self._make_interface = None
2628        self.in_run_state = None
2629        self.subtype = None
2630        return super().destroy()
def dlog(*args, **kwargs):
282def _fake_debug_log(*args, **kwargs):
283    pass
def log_exception_traceback_info():
291def log_exception_traceback_info():
292    exception = sys.exc_info()
293    formattedTraceback = traceback.format_exception(exception[0], exception[1], exception[2])
294    exception = exception[:2] + (formattedTraceback,)
295    trace = ''
296    for line in exception[2]:
297        trace += line
298    if __debug__: dlog(trace, file=sys.stderr)
299    if __debug__: dlog(exception[0])
300    if __debug__: dlog(exception[1])
@contextmanager
def log_uncatched_exception():
303@contextmanager
304def log_uncatched_exception():
305    try:
306        yield
307    except:
308        log_exception_traceback_info()
309        raise
def func_info(func, full_name: Union[bool, NoneType] = True):
312def func_info(func, full_name: Optional[bool]=True):
313    if full_name:
314        # return f'{func.__class__}({func.__module__}.{func.__qualname__}) @ {func.__code__.co_filename}:{func.__code__.co_firstlineno}'
315        return f'{func.__class__}({func.__qualname__}) @ {func.__code__.co_filename}:{func.__code__.co_firstlineno}'
316    else:
317        return f'{func.__class__}({func.__qualname__}) @ {os.path.basename(func.__code__.co_filename)}:{func.__code__.co_firstlineno}'
ServiceProcessingResultExists = <class 'bool'>
ServiceProcessingResult = typing.Any
ServiceProcessingException = typing.Union[BaseException, NoneType]
ServiceProcessingResponse = typing.Tuple[bool, typing.Any, typing.Union[BaseException, NoneType]]
def full_func_info_to_dict(my):
347def full_func_info_to_dict(my):
348    return {
349        'repr': repr(my), 
350        'module': str(my.__module__), 
351        'name': str(my.__name__), 
352        'qualname': str(my.__qualname__), 
353        'annotations': str(my.__annotations__), 
354        'class': str(my.__class__), 
355        'closure': str(my.__closure__), 
356        'code': str(my.__code__), 
357        'co_argcount': str(my.__code__.co_argcount),
358        'co_cellvars': str(my.__code__.co_cellvars),
359        'co_code': str(my.__code__.co_code),
360        'co_consts': str(my.__code__.co_consts),
361        'co_filename': str(my.__code__.co_filename),
362        'co_firstlineno': str(my.__code__.co_firstlineno),
363        'co_flags': str(my.__code__.co_flags),
364        'co_freevars': str(my.__code__.co_freevars),
365        'co_kwonlyargcount': str(my.__code__.co_kwonlyargcount),
366        'co_lnotab': str(my.__code__.co_lnotab),
367        'co_name': str(my.__code__.co_name),
368        'co_names': str(my.__code__.co_names),
369        'co_nlocals': str(my.__code__.co_nlocals),
370        'co_stacksize': str(my.__code__.co_stacksize),
371        'co_varnames': str(my.__code__.co_varnames),
372    }
def full_func_info_to_printable_dict(func: Callable) -> Dict[str, str]:
375def full_func_info_to_printable_dict(func: Callable) -> Dict[str, str]:
376    func_info: Dict[str, str] = full_func_info_to_dict(func)
377    good_keys = {'module', 'qualname', 'class', 'co_filename', 'co_firstlineno'}
378    result: Dict[str, str] = dict()
379    for key, value in func_info.items():
380        if key in good_keys:
381            result[key] = value
382    
383    return result
class GreenletWorkerWrapper:
2633class GreenletWorkerWrapper:
2634    def __init__(self, worker: Worker):
2635        self.worker = worker
2636        
2637    def __call__(self, interface: 'InterfaceGreenlet', *args, **kwargs):
2638        result = None
2639        try:
2640            result = self.worker(interface, *args, **kwargs)
2641        finally:
2642            interface.register_result(result)  # in case if GreenletExit will be raised by greenlet framework
2643        return result
2644
2645    def destroy(self):
2646        self.worker = None
GreenletWorkerWrapper( worker: Union[collections.abc.Callable[Interface, Any], collections.abc.Callable[Interface, Awaitable[Any]]])
2634    def __init__(self, worker: Worker):
2635        self.worker = worker
worker
def destroy(self):
2645    def destroy(self):
2646        self.worker = None
class EntityStatsMixin:
857class EntityStatsMixin:
858    class StatsLevel(Enum):
859        info = 0
860        debug = 1
861    
862    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel') -> Tuple[str, Dict[str, Any]]:
863        raise NotImplementedError
def get_entity_stats( self, stats_level: EntityStatsMixin.StatsLevel) -> Tuple[str, Dict[str, Any]]:
862    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel') -> Tuple[str, Dict[str, Any]]:
863        raise NotImplementedError
class EntityStatsMixin.StatsLevel(enum.Enum):
858    class StatsLevel(Enum):
859        info = 0
860        debug = 1

An enumeration.

info = <StatsLevel.info: 0>
debug = <StatsLevel.debug: 1>
Inherited Members
enum.Enum
name
value
class CoroSchedulerIsCurrentlyDestroingError(builtins.Exception):
866class CoroSchedulerIsCurrentlyDestroingError(Exception):
867    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class CoroSchedulerDestroyException(builtins.Exception):
870class CoroSchedulerDestroyException(Exception):
871    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class CoroSchedulerDestroyRequestedException(builtins.Exception):
874class CoroSchedulerDestroyRequestedException(Exception):
875    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
greenlet_awailable: bool = True
class GreenletExit(builtins.BaseException):

Common base class for all exceptions

Inherited Members
builtins.BaseException
BaseException
with_traceback
args