cengal.parallel_execution.coroutines.coro_standard_services.kill_coro_list.versions.v_0.kill_coro_list
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19""" 20Module Docstring 21Docstrings: http://www.python.org/dev/peps/pep-0257/ 22""" 23 24 25__author__ = "ButenkoMS <gtalk@butenkoms.space>" 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 28__license__ = "Apache License, Version 2.0" 29__version__ = "4.4.1" 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 31__email__ = "gtalk@butenkoms.space" 32# __status__ = "Prototype" 33__status__ = "Development" 34# __status__ = "Production" 35 36 37__all__ = ['KillSingleCoroParams', 'KillCoroList', 'kill_coro_list_on', 'try_kill_coro_list_on', 'akill_coro_list_on', 'atry_kill_coro_list_on', 'kill_coro_list', 'try_kill_coro_list', 'akill_coro_list', 'atry_kill_coro_list'] 38 39 40from cengal.parallel_execution.coroutines.coro_scheduler import * 41from cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request import * 42from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro 43from cengal.code_flow_control.smart_values import ValueExistence 44from cengal.introspection.inspect import get_exception, get_exception_tripple 45from typing import Sequence, Tuple, List, Optional, Any, cast, Dict, Union, Set 46 47 48class KillSingleCoroParams: 49 def __init__(self, coro_id: CoroID, tree: bool = False) -> None: 50 self.coro_id: CoroID = coro_id 51 self.tree: bool = tree 52 53 def __call__(self, new_coro_id: Optional[CoroID] = None) -> Tuple: 54 return self.coro_id if new_coro_id is None else new_coro_id 55 56 def __bool__(self): 57 return self.tree 58 59 60class KillCoroList(TypedService[List[Tuple[Optional[bool], Optional[Exception]]]], ServiceWithADirectRequestMixin): 61 def __init__(self, loop: CoroSchedulerType): 62 super(KillCoroList, self).__init__(loop) 63 self.direct_requests: List[Tuple] = list() 64 65 def single_task_registration_or_immediate_processing( 66 self, coro_list: Sequence[Union[CoroID, KillSingleCoroParams]] 67 ) -> Tuple[bool, Sequence[Tuple[bool, Optional[Exception]]], Any]: 68 results = list() 69 try: 70 put_coro: PutCoro = self._loop.get_service_instance(PutCoro) 71 for request in coro_list: 72 exception = None 73 result = None 74 try: 75 if isinstance(request, KillSingleCoroParams): 76 coro_id = request.coro_id 77 if request: 78 children: Set[CoroID] = put_coro.get_set_of_all_children(coro_id) 79 80 result = self._loop.kill_coro_by_id(coro_id) 81 if request: 82 for child in children: 83 self._loop.kill_coro_by_id(child) 84 else: 85 coro_id = request 86 result = self._loop.kill_coro_by_id(coro_id) 87 except: 88 exception = get_exception() 89 90 results.append((result, exception)) 91 except: 92 return True, results, get_exception() 93 94 return True, results, None 95 96 def full_processing_iteration(self): 97 put_coro: PutCoro = self._loop.get_service_instance(PutCoro) 98 direct_requests_buff = self.direct_requests 99 self.direct_requests = type(direct_requests_buff)() 100 for coro_list in direct_requests_buff: 101 for request in coro_list: 102 exception = None 103 try: 104 if isinstance(request, KillSingleCoroParams): 105 coro_id = request.coro_id 106 if request: 107 children: Set[CoroID] = put_coro.get_set_of_all_children(coro_id) 108 109 self._loop.kill_coro_by_id(coro_id) 110 if request: 111 for child in children: 112 self._loop.kill_coro_by_id(child) 113 else: 114 coro_id = request 115 self._loop.kill_coro_by_id(coro_id) 116 except: 117 ex_type, exception, tracback = get_exception_tripple() 118 if __debug__: dlog(ex_type, exception, tracback) 119 raise 120 121 self.make_dead() 122 123 def _add_direct_request(self, coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[None]: 124 self.direct_requests.append(coro_list) 125 self.make_live() 126 return (False, None) 127 128 def in_work(self) -> bool: 129 result: bool = bool(self.direct_requests) 130 return self.thrifty_in_work(result) 131 132 133def kill_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]: 134 return make_request_to_service_with_context(context, KillCoroList, coro_list) 135 136 137def try_kill_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]: 138 return try_make_request_to_service_with_context(context, KillCoroList, coro_list) 139 140 141async def akill_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]: 142 return await amake_request_to_service_with_context(context, KillCoroList, coro_list) 143 144 145async def atry_kill_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]: 146 return await atry_make_request_to_service_with_context(context, KillCoroList, coro_list) 147 148 149def kill_coro_list(coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]: 150 return make_request_to_service(KillCoroList, coro_list) 151 152 153def try_kill_coro_list(coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]: 154 return try_make_request_to_service(KillCoroList, coro_list) 155 156 157async def akill_coro_list(coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]: 158 return await amake_request_to_service(KillCoroList, coro_list) 159 160 161async def atry_kill_coro_list(coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]: 162 return await atry_make_request_to_service(KillCoroList, coro_list)
49class KillSingleCoroParams: 50 def __init__(self, coro_id: CoroID, tree: bool = False) -> None: 51 self.coro_id: CoroID = coro_id 52 self.tree: bool = tree 53 54 def __call__(self, new_coro_id: Optional[CoroID] = None) -> Tuple: 55 return self.coro_id if new_coro_id is None else new_coro_id 56 57 def __bool__(self): 58 return self.tree
61class KillCoroList(TypedService[List[Tuple[Optional[bool], Optional[Exception]]]], ServiceWithADirectRequestMixin): 62 def __init__(self, loop: CoroSchedulerType): 63 super(KillCoroList, self).__init__(loop) 64 self.direct_requests: List[Tuple] = list() 65 66 def single_task_registration_or_immediate_processing( 67 self, coro_list: Sequence[Union[CoroID, KillSingleCoroParams]] 68 ) -> Tuple[bool, Sequence[Tuple[bool, Optional[Exception]]], Any]: 69 results = list() 70 try: 71 put_coro: PutCoro = self._loop.get_service_instance(PutCoro) 72 for request in coro_list: 73 exception = None 74 result = None 75 try: 76 if isinstance(request, KillSingleCoroParams): 77 coro_id = request.coro_id 78 if request: 79 children: Set[CoroID] = put_coro.get_set_of_all_children(coro_id) 80 81 result = self._loop.kill_coro_by_id(coro_id) 82 if request: 83 for child in children: 84 self._loop.kill_coro_by_id(child) 85 else: 86 coro_id = request 87 result = self._loop.kill_coro_by_id(coro_id) 88 except: 89 exception = get_exception() 90 91 results.append((result, exception)) 92 except: 93 return True, results, get_exception() 94 95 return True, results, None 96 97 def full_processing_iteration(self): 98 put_coro: PutCoro = self._loop.get_service_instance(PutCoro) 99 direct_requests_buff = self.direct_requests 100 self.direct_requests = type(direct_requests_buff)() 101 for coro_list in direct_requests_buff: 102 for request in coro_list: 103 exception = None 104 try: 105 if isinstance(request, KillSingleCoroParams): 106 coro_id = request.coro_id 107 if request: 108 children: Set[CoroID] = put_coro.get_set_of_all_children(coro_id) 109 110 self._loop.kill_coro_by_id(coro_id) 111 if request: 112 for child in children: 113 self._loop.kill_coro_by_id(child) 114 else: 115 coro_id = request 116 self._loop.kill_coro_by_id(coro_id) 117 except: 118 ex_type, exception, tracback = get_exception_tripple() 119 if __debug__: dlog(ex_type, exception, tracback) 120 raise 121 122 self.make_dead() 123 124 def _add_direct_request(self, coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[None]: 125 self.direct_requests.append(coro_list) 126 self.make_live() 127 return (False, None) 128 129 def in_work(self) -> bool: 130 result: bool = bool(self.direct_requests) 131 return self.thrifty_in_work(result)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
66 def single_task_registration_or_immediate_processing( 67 self, coro_list: Sequence[Union[CoroID, KillSingleCoroParams]] 68 ) -> Tuple[bool, Sequence[Tuple[bool, Optional[Exception]]], Any]: 69 results = list() 70 try: 71 put_coro: PutCoro = self._loop.get_service_instance(PutCoro) 72 for request in coro_list: 73 exception = None 74 result = None 75 try: 76 if isinstance(request, KillSingleCoroParams): 77 coro_id = request.coro_id 78 if request: 79 children: Set[CoroID] = put_coro.get_set_of_all_children(coro_id) 80 81 result = self._loop.kill_coro_by_id(coro_id) 82 if request: 83 for child in children: 84 self._loop.kill_coro_by_id(child) 85 else: 86 coro_id = request 87 result = self._loop.kill_coro_by_id(coro_id) 88 except: 89 exception = get_exception() 90 91 results.append((result, exception)) 92 except: 93 return True, results, get_exception() 94 95 return True, results, None
97 def full_processing_iteration(self): 98 put_coro: PutCoro = self._loop.get_service_instance(PutCoro) 99 direct_requests_buff = self.direct_requests 100 self.direct_requests = type(direct_requests_buff)() 101 for coro_list in direct_requests_buff: 102 for request in coro_list: 103 exception = None 104 try: 105 if isinstance(request, KillSingleCoroParams): 106 coro_id = request.coro_id 107 if request: 108 children: Set[CoroID] = put_coro.get_set_of_all_children(coro_id) 109 110 self._loop.kill_coro_by_id(coro_id) 111 if request: 112 for child in children: 113 self._loop.kill_coro_by_id(child) 114 else: 115 coro_id = request 116 self._loop.kill_coro_by_id(coro_id) 117 except: 118 ex_type, exception, tracback = get_exception_tripple() 119 if __debug__: dlog(ex_type, exception, tracback) 120 raise 121 122 self.make_dead()
129 def in_work(self) -> bool: 130 result: bool = bool(self.direct_requests) 131 return self.thrifty_in_work(result)
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- time_left_before_next_event
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
- destroy
138def try_kill_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]: 139 return try_make_request_to_service_with_context(context, KillCoroList, coro_list)
142async def akill_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]: 143 return await amake_request_to_service_with_context(context, KillCoroList, coro_list)
146async def atry_kill_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]: 147 return await atry_make_request_to_service_with_context(context, KillCoroList, coro_list)