cengal.parallel_execution.coroutines.coro_standard_services.run_coro.versions.v_0.run_coro

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19"""
 20Module Docstring
 21Docstrings: http://www.python.org/dev/peps/pep-0257/
 22"""
 23
 24
 25__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 28__license__ = "Apache License, Version 2.0"
 29__version__ = "4.4.1"
 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 31__email__ = "gtalk@butenkoms.space"
 32# __status__ = "Prototype"
 33__status__ = "Development"
 34# __status__ = "Production"
 35
 36
 37__all__ = ['RunCoro', 'arun_coro_fast', 'run_coro_fast', 'arun_coro', 'run_coro']
 38
 39from cengal.parallel_execution.coroutines.coro_scheduler import *
 40from cengal.introspection.inspect import get_exception, get_exception_tripple
 41from cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request import *
 42from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import put_current_from_other_service
 43from cengal.code_flow_control.smart_values import ValueExistence
 44from typing import Any, Optional, Tuple, Dict, Set, Union, List
 45import sys
 46
 47
 48class RunCoro(ServiceWithADirectRequestMixin, TypedService[Any]):
 49    def __init__(self, loop: CoroSchedulerType):
 50        super(RunCoro, self).__init__(loop)
 51        self.called_by: Dict[CoroID, CoroID] = dict()  # Dict[CoroID, CoroID] # key - callable; value - requester
 52        self.results: List[Tuple[CoroID, Any, Optional[BaseException]]] = list()  # (id, result, exception)
 53        self.results: Dict[CoroID, Tuple[Any, Optional[BaseException]]] = dict()  # (id, result, exception)
 54    
 55    def single_task_registration_or_immediate_processing(
 56            self, coro_worker: AnyWorker, *args, **kwargs) -> Tuple[bool, Any, Optional[BaseException]]:
 57        requester_id = self.current_caller_coro_info.coro_id
 58        try:
 59            coro: CoroWrapperBase = put_current_from_other_service(self, coro_worker, *args, **kwargs)
 60        except:
 61            ex_type, exception, tracback = get_exception_tripple()
 62            if __debug__: dlog(ex_type, exception, tracback)
 63            exception = exception.with_traceback(tracback)
 64            return True, None, exception
 65        
 66        coro.add_on_coro_del_handler(self._on_coro_del_handler)
 67        self.called_by[coro.coro_id] = requester_id
 68        return False, None, None
 69
 70    def full_processing_iteration(self):
 71        for coro_id, result_and_exception in self.results.items():
 72            result, exception = result_and_exception
 73            self.register_response(self.called_by[coro_id], result, exception)
 74            del self.called_by[coro_id]
 75        
 76        self.results = type(self.results)()
 77        self.make_dead()
 78
 79    def in_work(self) -> bool:
 80        result: bool = bool(self.results)
 81        return self.thrifty_in_work(result)
 82
 83    def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool:
 84        if coro.coro_id in self.results:
 85            raise RuntimeError(f'CoroWrapperBase is already in results list. {coro.coro_id=}, {coro.last_result=}, {coro.exception=}')
 86        
 87        self.results[coro.coro_id] = (coro.last_result, coro.exception)
 88        self.make_live()
 89        return True
 90
 91
 92async def arun_coro_fast(i: Interface, coro_worker: AnyWorker, *args, **kwargs) -> Any:
 93    coro_type: CoroType = find_coro_type(coro_worker)
 94    if CoroType.awaitable == coro_type:
 95        return await coro_worker(i, *args, **kwargs)
 96    else:
 97        return await i(RunCoro, coro_worker, *args, **kwargs)
 98
 99
100def run_coro_fast(i: Interface, coro_worker: AnyWorker, *args, **kwargs) -> Any:
101    coro_type: CoroType = find_coro_type(coro_worker)
102    if CoroType.greenlet == coro_type:
103        return coro_worker(i, *args, **kwargs)
104    else:
105        return i(RunCoro, coro_worker, *args, **kwargs)
106
107
108async def arun_coro(coro_worker: AnyWorker, *args, **kwargs) -> Any:
109    i: Interface = current_interface()
110    coro_type: CoroType = find_coro_type(coro_worker)
111    if CoroType.awaitable == coro_type:
112        return await coro_worker(i, *args, **kwargs)
113    else:
114        return await i(RunCoro, coro_worker, *args, **kwargs)
115
116
117def run_coro(coro_worker: AnyWorker, *args, **kwargs) -> Any:
118    i: Interface = current_interface()
119    coro_type: CoroType = find_coro_type(coro_worker)
120    if CoroType.greenlet == coro_type:
121        return coro_worker(i, *args, **kwargs)
122    else:
123        return i(RunCoro, coro_worker, *args, **kwargs)
class RunCoro(cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request.versions.v_0.service_with_a_direct_request.ServiceWithADirectRequestMixin, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.TypedService[typing.Any]):
49class RunCoro(ServiceWithADirectRequestMixin, TypedService[Any]):
50    def __init__(self, loop: CoroSchedulerType):
51        super(RunCoro, self).__init__(loop)
52        self.called_by: Dict[CoroID, CoroID] = dict()  # Dict[CoroID, CoroID] # key - callable; value - requester
53        self.results: List[Tuple[CoroID, Any, Optional[BaseException]]] = list()  # (id, result, exception)
54        self.results: Dict[CoroID, Tuple[Any, Optional[BaseException]]] = dict()  # (id, result, exception)
55    
56    def single_task_registration_or_immediate_processing(
57            self, coro_worker: AnyWorker, *args, **kwargs) -> Tuple[bool, Any, Optional[BaseException]]:
58        requester_id = self.current_caller_coro_info.coro_id
59        try:
60            coro: CoroWrapperBase = put_current_from_other_service(self, coro_worker, *args, **kwargs)
61        except:
62            ex_type, exception, tracback = get_exception_tripple()
63            if __debug__: dlog(ex_type, exception, tracback)
64            exception = exception.with_traceback(tracback)
65            return True, None, exception
66        
67        coro.add_on_coro_del_handler(self._on_coro_del_handler)
68        self.called_by[coro.coro_id] = requester_id
69        return False, None, None
70
71    def full_processing_iteration(self):
72        for coro_id, result_and_exception in self.results.items():
73            result, exception = result_and_exception
74            self.register_response(self.called_by[coro_id], result, exception)
75            del self.called_by[coro_id]
76        
77        self.results = type(self.results)()
78        self.make_dead()
79
80    def in_work(self) -> bool:
81        result: bool = bool(self.results)
82        return self.thrifty_in_work(result)
83
84    def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool:
85        if coro.coro_id in self.results:
86            raise RuntimeError(f'CoroWrapperBase is already in results list. {coro.coro_id=}, {coro.last_result=}, {coro.exception=}')
87        
88        self.results[coro.coro_id] = (coro.last_result, coro.exception)
89        self.make_live()
90        return True

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

RunCoro( loop: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
50    def __init__(self, loop: CoroSchedulerType):
51        super(RunCoro, self).__init__(loop)
52        self.called_by: Dict[CoroID, CoroID] = dict()  # Dict[CoroID, CoroID] # key - callable; value - requester
53        self.results: List[Tuple[CoroID, Any, Optional[BaseException]]] = list()  # (id, result, exception)
54        self.results: Dict[CoroID, Tuple[Any, Optional[BaseException]]] = dict()  # (id, result, exception)
called_by: Dict[int, int]
results: Dict[int, Tuple[Any, Union[BaseException, NoneType]]]
def single_task_registration_or_immediate_processing( self, coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> Tuple[bool, Any, Union[BaseException, NoneType]]:
56    def single_task_registration_or_immediate_processing(
57            self, coro_worker: AnyWorker, *args, **kwargs) -> Tuple[bool, Any, Optional[BaseException]]:
58        requester_id = self.current_caller_coro_info.coro_id
59        try:
60            coro: CoroWrapperBase = put_current_from_other_service(self, coro_worker, *args, **kwargs)
61        except:
62            ex_type, exception, tracback = get_exception_tripple()
63            if __debug__: dlog(ex_type, exception, tracback)
64            exception = exception.with_traceback(tracback)
65            return True, None, exception
66        
67        coro.add_on_coro_del_handler(self._on_coro_del_handler)
68        self.called_by[coro.coro_id] = requester_id
69        return False, None, None
def full_processing_iteration(self):
71    def full_processing_iteration(self):
72        for coro_id, result_and_exception in self.results.items():
73            result, exception = result_and_exception
74            self.register_response(self.called_by[coro_id], result, exception)
75            del self.called_by[coro_id]
76        
77        self.results = type(self.results)()
78        self.make_dead()
def in_work(self) -> bool:
80    def in_work(self) -> bool:
81        result: bool = bool(self.results)
82        return self.thrifty_in_work(result)

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
in_forground_work
thrifty_in_work
time_left_before_next_event
is_low_latency
make_live
make_dead
service_id_impl
service_id
destroy
async def arun_coro_fast( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> Any:
93async def arun_coro_fast(i: Interface, coro_worker: AnyWorker, *args, **kwargs) -> Any:
94    coro_type: CoroType = find_coro_type(coro_worker)
95    if CoroType.awaitable == coro_type:
96        return await coro_worker(i, *args, **kwargs)
97    else:
98        return await i(RunCoro, coro_worker, *args, **kwargs)
def run_coro_fast( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> Any:
101def run_coro_fast(i: Interface, coro_worker: AnyWorker, *args, **kwargs) -> Any:
102    coro_type: CoroType = find_coro_type(coro_worker)
103    if CoroType.greenlet == coro_type:
104        return coro_worker(i, *args, **kwargs)
105    else:
106        return i(RunCoro, coro_worker, *args, **kwargs)
async def arun_coro( coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> Any:
109async def arun_coro(coro_worker: AnyWorker, *args, **kwargs) -> Any:
110    i: Interface = current_interface()
111    coro_type: CoroType = find_coro_type(coro_worker)
112    if CoroType.awaitable == coro_type:
113        return await coro_worker(i, *args, **kwargs)
114    else:
115        return await i(RunCoro, coro_worker, *args, **kwargs)
def run_coro( coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> Any:
118def run_coro(coro_worker: AnyWorker, *args, **kwargs) -> Any:
119    i: Interface = current_interface()
120    coro_type: CoroType = find_coro_type(coro_worker)
121    if CoroType.greenlet == coro_type:
122        return coro_worker(i, *args, **kwargs)
123    else:
124        return i(RunCoro, coro_worker, *args, **kwargs)