cengal.parallel_execution.coroutines.coro_standard_services.run_coro.versions.v_0.run_coro
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19""" 20Module Docstring 21Docstrings: http://www.python.org/dev/peps/pep-0257/ 22""" 23 24 25__author__ = "ButenkoMS <gtalk@butenkoms.space>" 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 28__license__ = "Apache License, Version 2.0" 29__version__ = "4.4.1" 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 31__email__ = "gtalk@butenkoms.space" 32# __status__ = "Prototype" 33__status__ = "Development" 34# __status__ = "Production" 35 36 37__all__ = ['RunCoro', 'arun_coro_fast', 'run_coro_fast', 'arun_coro', 'run_coro'] 38 39from cengal.parallel_execution.coroutines.coro_scheduler import * 40from cengal.introspection.inspect import get_exception, get_exception_tripple 41from cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request import * 42from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import put_current_from_other_service 43from cengal.code_flow_control.smart_values import ValueExistence 44from typing import Any, Optional, Tuple, Dict, Set, Union, List 45import sys 46 47 48class RunCoro(ServiceWithADirectRequestMixin, TypedService[Any]): 49 def __init__(self, loop: CoroSchedulerType): 50 super(RunCoro, self).__init__(loop) 51 self.called_by: Dict[CoroID, CoroID] = dict() # Dict[CoroID, CoroID] # key - callable; value - requester 52 self.results: List[Tuple[CoroID, Any, Optional[BaseException]]] = list() # (id, result, exception) 53 self.results: Dict[CoroID, Tuple[Any, Optional[BaseException]]] = dict() # (id, result, exception) 54 55 def single_task_registration_or_immediate_processing( 56 self, coro_worker: AnyWorker, *args, **kwargs) -> Tuple[bool, Any, Optional[BaseException]]: 57 requester_id = self.current_caller_coro_info.coro_id 58 try: 59 coro: CoroWrapperBase = put_current_from_other_service(self, coro_worker, *args, **kwargs) 60 except: 61 ex_type, exception, tracback = get_exception_tripple() 62 if __debug__: dlog(ex_type, exception, tracback) 63 exception = exception.with_traceback(tracback) 64 return True, None, exception 65 66 coro.add_on_coro_del_handler(self._on_coro_del_handler) 67 self.called_by[coro.coro_id] = requester_id 68 return False, None, None 69 70 def full_processing_iteration(self): 71 for coro_id, result_and_exception in self.results.items(): 72 result, exception = result_and_exception 73 self.register_response(self.called_by[coro_id], result, exception) 74 del self.called_by[coro_id] 75 76 self.results = type(self.results)() 77 self.make_dead() 78 79 def in_work(self) -> bool: 80 result: bool = bool(self.results) 81 return self.thrifty_in_work(result) 82 83 def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool: 84 if coro.coro_id in self.results: 85 raise RuntimeError(f'CoroWrapperBase is already in results list. {coro.coro_id=}, {coro.last_result=}, {coro.exception=}') 86 87 self.results[coro.coro_id] = (coro.last_result, coro.exception) 88 self.make_live() 89 return True 90 91 92async def arun_coro_fast(i: Interface, coro_worker: AnyWorker, *args, **kwargs) -> Any: 93 coro_type: CoroType = find_coro_type(coro_worker) 94 if CoroType.awaitable == coro_type: 95 return await coro_worker(i, *args, **kwargs) 96 else: 97 return await i(RunCoro, coro_worker, *args, **kwargs) 98 99 100def run_coro_fast(i: Interface, coro_worker: AnyWorker, *args, **kwargs) -> Any: 101 coro_type: CoroType = find_coro_type(coro_worker) 102 if CoroType.greenlet == coro_type: 103 return coro_worker(i, *args, **kwargs) 104 else: 105 return i(RunCoro, coro_worker, *args, **kwargs) 106 107 108async def arun_coro(coro_worker: AnyWorker, *args, **kwargs) -> Any: 109 i: Interface = current_interface() 110 coro_type: CoroType = find_coro_type(coro_worker) 111 if CoroType.awaitable == coro_type: 112 return await coro_worker(i, *args, **kwargs) 113 else: 114 return await i(RunCoro, coro_worker, *args, **kwargs) 115 116 117def run_coro(coro_worker: AnyWorker, *args, **kwargs) -> Any: 118 i: Interface = current_interface() 119 coro_type: CoroType = find_coro_type(coro_worker) 120 if CoroType.greenlet == coro_type: 121 return coro_worker(i, *args, **kwargs) 122 else: 123 return i(RunCoro, coro_worker, *args, **kwargs)
49class RunCoro(ServiceWithADirectRequestMixin, TypedService[Any]): 50 def __init__(self, loop: CoroSchedulerType): 51 super(RunCoro, self).__init__(loop) 52 self.called_by: Dict[CoroID, CoroID] = dict() # Dict[CoroID, CoroID] # key - callable; value - requester 53 self.results: List[Tuple[CoroID, Any, Optional[BaseException]]] = list() # (id, result, exception) 54 self.results: Dict[CoroID, Tuple[Any, Optional[BaseException]]] = dict() # (id, result, exception) 55 56 def single_task_registration_or_immediate_processing( 57 self, coro_worker: AnyWorker, *args, **kwargs) -> Tuple[bool, Any, Optional[BaseException]]: 58 requester_id = self.current_caller_coro_info.coro_id 59 try: 60 coro: CoroWrapperBase = put_current_from_other_service(self, coro_worker, *args, **kwargs) 61 except: 62 ex_type, exception, tracback = get_exception_tripple() 63 if __debug__: dlog(ex_type, exception, tracback) 64 exception = exception.with_traceback(tracback) 65 return True, None, exception 66 67 coro.add_on_coro_del_handler(self._on_coro_del_handler) 68 self.called_by[coro.coro_id] = requester_id 69 return False, None, None 70 71 def full_processing_iteration(self): 72 for coro_id, result_and_exception in self.results.items(): 73 result, exception = result_and_exception 74 self.register_response(self.called_by[coro_id], result, exception) 75 del self.called_by[coro_id] 76 77 self.results = type(self.results)() 78 self.make_dead() 79 80 def in_work(self) -> bool: 81 result: bool = bool(self.results) 82 return self.thrifty_in_work(result) 83 84 def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool: 85 if coro.coro_id in self.results: 86 raise RuntimeError(f'CoroWrapperBase is already in results list. {coro.coro_id=}, {coro.last_result=}, {coro.exception=}') 87 88 self.results[coro.coro_id] = (coro.last_result, coro.exception) 89 self.make_live() 90 return True
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
50 def __init__(self, loop: CoroSchedulerType): 51 super(RunCoro, self).__init__(loop) 52 self.called_by: Dict[CoroID, CoroID] = dict() # Dict[CoroID, CoroID] # key - callable; value - requester 53 self.results: List[Tuple[CoroID, Any, Optional[BaseException]]] = list() # (id, result, exception) 54 self.results: Dict[CoroID, Tuple[Any, Optional[BaseException]]] = dict() # (id, result, exception)
56 def single_task_registration_or_immediate_processing( 57 self, coro_worker: AnyWorker, *args, **kwargs) -> Tuple[bool, Any, Optional[BaseException]]: 58 requester_id = self.current_caller_coro_info.coro_id 59 try: 60 coro: CoroWrapperBase = put_current_from_other_service(self, coro_worker, *args, **kwargs) 61 except: 62 ex_type, exception, tracback = get_exception_tripple() 63 if __debug__: dlog(ex_type, exception, tracback) 64 exception = exception.with_traceback(tracback) 65 return True, None, exception 66 67 coro.add_on_coro_del_handler(self._on_coro_del_handler) 68 self.called_by[coro.coro_id] = requester_id 69 return False, None, None
71 def full_processing_iteration(self): 72 for coro_id, result_and_exception in self.results.items(): 73 result, exception = result_and_exception 74 self.register_response(self.called_by[coro_id], result, exception) 75 del self.called_by[coro_id] 76 77 self.results = type(self.results)() 78 self.make_dead()
80 def in_work(self) -> bool: 81 result: bool = bool(self.results) 82 return self.thrifty_in_work(result)
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- time_left_before_next_event
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
- destroy
109async def arun_coro(coro_worker: AnyWorker, *args, **kwargs) -> Any: 110 i: Interface = current_interface() 111 coro_type: CoroType = find_coro_type(coro_worker) 112 if CoroType.awaitable == coro_type: 113 return await coro_worker(i, *args, **kwargs) 114 else: 115 return await i(RunCoro, coro_worker, *args, **kwargs)
118def run_coro(coro_worker: AnyWorker, *args, **kwargs) -> Any: 119 i: Interface = current_interface() 120 coro_type: CoroType = find_coro_type(coro_worker) 121 if CoroType.greenlet == coro_type: 122 return coro_worker(i, *args, **kwargs) 123 else: 124 return i(RunCoro, coro_worker, *args, **kwargs)