cengal.parallel_execution.coroutines.coro_standard_services.throw_coro_list.versions.v_0.throw_coro_list
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19""" 20Module Docstring 21Docstrings: http://www.python.org/dev/peps/pep-0257/ 22""" 23 24 25__author__ = "ButenkoMS <gtalk@butenkoms.space>" 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 28__license__ = "Apache License, Version 2.0" 29__version__ = "4.4.1" 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 31__email__ = "gtalk@butenkoms.space" 32# __status__ = "Prototype" 33__status__ = "Development" 34# __status__ = "Production" 35 36 37__all__ = ['ThrowSingleCoroParams', 'ThrowCoroList', 'throw_coro_list_on', 'try_throw_coro_list_on', 'athrow_coro_list_on', 'atry_throw_coro_list_on', 'throw_coro_list', 'try_throw_coro_list', 'athrow_coro_list', 'atry_throw_coro_list'] 38 39 40from cengal.parallel_execution.coroutines.coro_scheduler import * 41from cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request import * 42from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro 43from cengal.code_flow_control.smart_values import ValueExistence 44from cengal.introspection.inspect import get_exception, get_exception_tripple 45from typing import Sequence, Tuple, List, Optional, Any, Type, cast, Dict, Union, Set 46 47 48class ThrowSingleCoroParams: 49 def __init__(self, coro_id: CoroID, ex_type: Type[Exception], ex_value: Exception = None, ex_traceback: Any = None, tree: bool = False) -> None: 50 self.coro_id: CoroID = coro_id 51 self.ex_type: Type[Exception] = ex_type 52 self.ex_value: Exception = ex_value 53 self.ex_traceback: Any = ex_traceback 54 self.tree: bool = tree 55 56 def __call__(self, new_coro_id: Optional[CoroID] = None) -> Tuple: 57 return self.coro_id if new_coro_id is None else new_coro_id, self.ex_type, self.ex_value, self.ex_traceback 58 59 def __bool__(self): 60 return self.tree 61 62 63class ThrowCoroList(TypedService[List[Tuple[Optional[bool], Optional[Exception]]]], ServiceWithADirectRequestMixin): 64 def __init__(self, loop: CoroSchedulerType): 65 super(ThrowCoroList, self).__init__(loop) 66 self.direct_requests: List[Tuple] = list() 67 68 def single_task_registration_or_immediate_processing( 69 self, coro_list: Sequence[ThrowSingleCoroParams] 70 ) -> Tuple[bool, Sequence[Tuple[bool, Optional[Exception]]], Any]: 71 results = list() 72 try: 73 put_coro: PutCoro = self._loop.get_service_instance(PutCoro) 74 for request in coro_list: 75 result = None 76 exception = None 77 try: 78 if request: 79 children: Set[CoroID] = put_coro.get_set_of_all_children(request.coro_id) 80 81 result = self._loop.throw_coro_by_id(*request()) 82 if request: 83 for child_coro_id in children: 84 self._loop.throw_coro_by_id(*request(child_coro_id)) 85 except: 86 exception = get_exception() 87 88 results.append((result, exception)) 89 except: 90 return True, results, get_exception() 91 92 return True, results, None 93 94 def full_processing_iteration(self): 95 put_coro: PutCoro = self._loop.get_service_instance(PutCoro) 96 direct_requests_buff = self.direct_requests 97 self.direct_requests = type(direct_requests_buff)() 98 for coro_list in direct_requests_buff: 99 for request in coro_list: 100 try: 101 if request: 102 children: Set[CoroID] = put_coro.get_set_of_all_children(request.coro_id) 103 104 self._loop.throw_coro_by_id(*request()) 105 if request: 106 for child_coro_id in children: 107 self._loop.throw_coro_by_id(*request(child_coro_id)) 108 except: 109 ex_type, exception, tracback = get_exception_tripple() 110 if __debug__: dlog(ex_type, exception, tracback) 111 raise 112 113 self.make_dead() 114 115 def _add_direct_request(self, coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[None]: 116 self.direct_requests.append(coro_list) 117 self.make_live() 118 return (False, None) 119 120 def in_work(self) -> bool: 121 result: bool = bool(self.direct_requests) 122 return self.thrifty_in_work(result) 123 124 125def throw_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]: 126 return make_request_to_service_with_context(context, ThrowCoroList, coro_list) 127 128 129def try_throw_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]: 130 return try_make_request_to_service_with_context(context, ThrowCoroList, coro_list) 131 132 133async def athrow_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]: 134 return await amake_request_to_service_with_context(context, ThrowCoroList, coro_list) 135 136 137async def atry_throw_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]: 138 return await atry_make_request_to_service_with_context(context, ThrowCoroList, coro_list) 139 140 141def throw_coro_list(coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]: 142 return make_request_to_service(ThrowCoroList, coro_list) 143 144 145def try_throw_coro_list(coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]: 146 return try_make_request_to_service(ThrowCoroList, coro_list) 147 148 149async def athrow_coro_list(coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]: 150 return await amake_request_to_service(ThrowCoroList, coro_list) 151 152 153async def atry_throw_coro_list(coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]: 154 return await atry_make_request_to_service(ThrowCoroList, coro_list)
49class ThrowSingleCoroParams: 50 def __init__(self, coro_id: CoroID, ex_type: Type[Exception], ex_value: Exception = None, ex_traceback: Any = None, tree: bool = False) -> None: 51 self.coro_id: CoroID = coro_id 52 self.ex_type: Type[Exception] = ex_type 53 self.ex_value: Exception = ex_value 54 self.ex_traceback: Any = ex_traceback 55 self.tree: bool = tree 56 57 def __call__(self, new_coro_id: Optional[CoroID] = None) -> Tuple: 58 return self.coro_id if new_coro_id is None else new_coro_id, self.ex_type, self.ex_value, self.ex_traceback 59 60 def __bool__(self): 61 return self.tree
50 def __init__(self, coro_id: CoroID, ex_type: Type[Exception], ex_value: Exception = None, ex_traceback: Any = None, tree: bool = False) -> None: 51 self.coro_id: CoroID = coro_id 52 self.ex_type: Type[Exception] = ex_type 53 self.ex_value: Exception = ex_value 54 self.ex_traceback: Any = ex_traceback 55 self.tree: bool = tree
64class ThrowCoroList(TypedService[List[Tuple[Optional[bool], Optional[Exception]]]], ServiceWithADirectRequestMixin): 65 def __init__(self, loop: CoroSchedulerType): 66 super(ThrowCoroList, self).__init__(loop) 67 self.direct_requests: List[Tuple] = list() 68 69 def single_task_registration_or_immediate_processing( 70 self, coro_list: Sequence[ThrowSingleCoroParams] 71 ) -> Tuple[bool, Sequence[Tuple[bool, Optional[Exception]]], Any]: 72 results = list() 73 try: 74 put_coro: PutCoro = self._loop.get_service_instance(PutCoro) 75 for request in coro_list: 76 result = None 77 exception = None 78 try: 79 if request: 80 children: Set[CoroID] = put_coro.get_set_of_all_children(request.coro_id) 81 82 result = self._loop.throw_coro_by_id(*request()) 83 if request: 84 for child_coro_id in children: 85 self._loop.throw_coro_by_id(*request(child_coro_id)) 86 except: 87 exception = get_exception() 88 89 results.append((result, exception)) 90 except: 91 return True, results, get_exception() 92 93 return True, results, None 94 95 def full_processing_iteration(self): 96 put_coro: PutCoro = self._loop.get_service_instance(PutCoro) 97 direct_requests_buff = self.direct_requests 98 self.direct_requests = type(direct_requests_buff)() 99 for coro_list in direct_requests_buff: 100 for request in coro_list: 101 try: 102 if request: 103 children: Set[CoroID] = put_coro.get_set_of_all_children(request.coro_id) 104 105 self._loop.throw_coro_by_id(*request()) 106 if request: 107 for child_coro_id in children: 108 self._loop.throw_coro_by_id(*request(child_coro_id)) 109 except: 110 ex_type, exception, tracback = get_exception_tripple() 111 if __debug__: dlog(ex_type, exception, tracback) 112 raise 113 114 self.make_dead() 115 116 def _add_direct_request(self, coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[None]: 117 self.direct_requests.append(coro_list) 118 self.make_live() 119 return (False, None) 120 121 def in_work(self) -> bool: 122 result: bool = bool(self.direct_requests) 123 return self.thrifty_in_work(result)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
69 def single_task_registration_or_immediate_processing( 70 self, coro_list: Sequence[ThrowSingleCoroParams] 71 ) -> Tuple[bool, Sequence[Tuple[bool, Optional[Exception]]], Any]: 72 results = list() 73 try: 74 put_coro: PutCoro = self._loop.get_service_instance(PutCoro) 75 for request in coro_list: 76 result = None 77 exception = None 78 try: 79 if request: 80 children: Set[CoroID] = put_coro.get_set_of_all_children(request.coro_id) 81 82 result = self._loop.throw_coro_by_id(*request()) 83 if request: 84 for child_coro_id in children: 85 self._loop.throw_coro_by_id(*request(child_coro_id)) 86 except: 87 exception = get_exception() 88 89 results.append((result, exception)) 90 except: 91 return True, results, get_exception() 92 93 return True, results, None
95 def full_processing_iteration(self): 96 put_coro: PutCoro = self._loop.get_service_instance(PutCoro) 97 direct_requests_buff = self.direct_requests 98 self.direct_requests = type(direct_requests_buff)() 99 for coro_list in direct_requests_buff: 100 for request in coro_list: 101 try: 102 if request: 103 children: Set[CoroID] = put_coro.get_set_of_all_children(request.coro_id) 104 105 self._loop.throw_coro_by_id(*request()) 106 if request: 107 for child_coro_id in children: 108 self._loop.throw_coro_by_id(*request(child_coro_id)) 109 except: 110 ex_type, exception, tracback = get_exception_tripple() 111 if __debug__: dlog(ex_type, exception, tracback) 112 raise 113 114 self.make_dead()
121 def in_work(self) -> bool: 122 result: bool = bool(self.direct_requests) 123 return self.thrifty_in_work(result)
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- time_left_before_next_event
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
- destroy
130def try_throw_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]: 131 return try_make_request_to_service_with_context(context, ThrowCoroList, coro_list)
134async def athrow_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]: 135 return await amake_request_to_service_with_context(context, ThrowCoroList, coro_list)
138async def atry_throw_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]: 139 return await atry_make_request_to_service_with_context(context, ThrowCoroList, coro_list)