cengal.parallel_execution.coroutines.coro_tools.coro_flow_control.versions.v_0.coro_flow_control
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19__all__ = ['GracefulCoroDestroy', 'graceful_coro_destroyer', 'agraceful_coro_destroyer', 'set_my_exit_handler', 'set_exit_handler', 'kill_coro_children', 'kill_my_children_on_exit', 'kill_children_on_exit'] 20 21 22from typing import Optional, Type, Any, Hashable, cast, Set, Callable 23from cengal.parallel_execution.coroutines.coro_scheduler import CoroScheduler, CoroSchedulerType, Interface, CoroID, ExplicitWorker, Worker, CoroWrapperBase, get_interface_for_an_explicit_loop, current_interface, current_coro_scheduler 24from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro, put_coro_to, put_coro, get_set_of_all_coro_children 25from cengal.parallel_execution.coroutines.coro_standard_services.sleep import Sleep 26from cengal.parallel_execution.coroutines.coro_standard_services.simple_yield import Yield 27from cengal.parallel_execution.coroutines.coro_standard_services.kill_coro import KillCoro, kill_coro 28from cengal.parallel_execution.coroutines.coro_standard_services.kill_coro_list import KillCoroList, kill_coro_list 29from cengal.parallel_execution.coroutines.coro_standard_services.throw_coro import ThrowCoro 30from cengal.parallel_execution.coroutines.coro_standard_services.wait_coro import WaitCoro, WaitCoroRequest, CoroutineNotFoundError 31from cengal.code_flow_control.smart_values import ValueExistence 32 33 34""" 35Module Docstring 36Docstrings: http://www.python.org/dev/peps/pep-0257/ 37""" 38 39__author__ = "ButenkoMS <gtalk@butenkoms.space>" 40__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 41__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 42__license__ = "Apache License, Version 2.0" 43__version__ = "4.4.1" 44__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 45__email__ = "gtalk@butenkoms.space" 46# __status__ = "Prototype" 47__status__ = "Development" 48# __status__ = "Production" 49 50 51class GracefulCoroDestroy(Exception): 52 pass 53 54 55def graceful_coro_destroyer( 56 i: Interface, 57 phase_time_limit: Optional[float], 58 coro_id: CoroID, 59 ex_type: Type[Exception] = None, ex_value: Exception = None, ex_traceback: Any = None, 60 tree: bool = True, 61 first_phase_is_wait: bool = True, 62 last_phase_is_kill: bool = True, 63 ): 64 phase_time_limit = phase_time_limit or 0 65 try: 66 if first_phase_is_wait: 67 try: 68 i(WaitCoro, WaitCoroRequest(phase_time_limit, kill_on_timeout=False, tree=tree, result_required=False).single(coro_id)) 69 except TimeoutError: 70 pass 71 72 if (ex_type is not None) or (ex_value is not None): 73 if not i(ThrowCoro, coro_id, ex_type, ex_value, ex_traceback, tree): 74 raise CoroutineNotFoundError 75 76 try: 77 i(WaitCoro, WaitCoroRequest(phase_time_limit, kill_on_timeout=False, tree=tree, result_required=False).single(coro_id)) 78 except TimeoutError: 79 pass 80 81 if not i(ThrowCoro, coro_id, GracefulCoroDestroy, tree=tree): 82 raise CoroutineNotFoundError 83 84 try: 85 i(WaitCoro, WaitCoroRequest(phase_time_limit, kill_on_timeout=last_phase_is_kill, tree=tree, result_required=False).single(coro_id)) 86 except TimeoutError: 87 pass 88 89 except CoroutineNotFoundError: 90 pass 91 finally: 92 i(Yield) 93 94 95async def agraceful_coro_destroyer( 96 i: Interface, 97 phase_time_limit: Optional[float], 98 coro_id: CoroID, 99 ex_type: Type[Exception] = None, ex_value: Exception = None, ex_traceback: Any = None, 100 tree: bool = True, 101 first_phase_is_wait: bool = True, 102 last_phase_is_kill: bool = True, 103 ): 104 phase_time_limit = phase_time_limit or 0 105 try: 106 if first_phase_is_wait: 107 try: 108 await i(WaitCoro, WaitCoroRequest(phase_time_limit, kill_on_timeout=False, tree=tree, result_required=False).single(coro_id)) 109 except TimeoutError: 110 pass 111 112 if (ex_type is not None) or (ex_value is not None): 113 if not await i(ThrowCoro, coro_id, ex_type, ex_value, ex_traceback, tree): 114 raise CoroutineNotFoundError 115 116 try: 117 await i(WaitCoro, WaitCoroRequest(phase_time_limit, kill_on_timeout=False, tree=tree, result_required=False).single(coro_id)) 118 except TimeoutError: 119 pass 120 121 if not await i(ThrowCoro, coro_id, GracefulCoroDestroy, tree=tree): 122 raise CoroutineNotFoundError 123 124 try: 125 await i(WaitCoro, WaitCoroRequest(phase_time_limit, kill_on_timeout=last_phase_is_kill, tree=tree, result_required=False).single(coro_id)) 126 except TimeoutError: 127 pass 128 129 except CoroutineNotFoundError: 130 pass 131 finally: 132 await i(Yield) 133 134 135def set_my_exit_handler(handler: Callable[[CoroWrapperBase], Optional[bool]]) -> bool: 136 i: Interface = current_interface() 137 coro = cast(CoroWrapperBase, i._coro) 138 coro.add_on_coro_del_handler(handler) 139 140 141def set_exit_handler(handler: Callable[[CoroWrapperBase], Optional[bool]], coro_id: Optional[CoroID] = None) -> bool: 142 current_loop: CoroSchedulerType = current_coro_scheduler() 143 if coro_id is None: 144 i: Interface = current_loop.current_interface() 145 if i is None: 146 return False 147 148 coro_id = i.coro_id 149 150 if coro_id is None: 151 return False 152 153 coro, was_new_born, new_born_index = current_loop.find_coro_by_id(coro_id) 154 coro = cast(CoroWrapperBase, coro) 155 coro.add_on_coro_del_handler(handler) 156 157 158def kill_coro_children(coro: CoroWrapperBase): 159 coro_id = coro.coro_id 160 children: Set[CoroID] = get_set_of_all_coro_children(coro_id) 161 for child_id in children: 162 kill_coro(child_id, tree=True) 163 164 165def kill_my_children_on_exit() -> bool: 166 i: Interface = current_interface() 167 coro = cast(CoroWrapperBase, i._coro) 168 coro.add_on_coro_del_handler(kill_coro_children) 169 170 171def kill_children_on_exit(coro_id: Optional[CoroID] = None) -> bool: 172 current_loop: CoroSchedulerType = current_coro_scheduler() 173 if coro_id is None: 174 i: Interface = current_loop.current_interface() 175 if i is None: 176 return False 177 178 coro_id = i.coro_id 179 180 if coro_id is None: 181 return False 182 183 coro, was_new_born, new_born_index = current_loop.find_coro_by_id(coro_id) 184 coro = cast(CoroWrapperBase, coro) 185 coro.add_on_coro_del_handler(kill_coro_children)
class
GracefulCoroDestroy(builtins.Exception):
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
def
graceful_coro_destroyer( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, phase_time_limit: Union[float, NoneType], coro_id: int, ex_type: Type[Exception] = None, ex_value: Exception = None, ex_traceback: Any = None, tree: bool = True, first_phase_is_wait: bool = True, last_phase_is_kill: bool = True):
56def graceful_coro_destroyer( 57 i: Interface, 58 phase_time_limit: Optional[float], 59 coro_id: CoroID, 60 ex_type: Type[Exception] = None, ex_value: Exception = None, ex_traceback: Any = None, 61 tree: bool = True, 62 first_phase_is_wait: bool = True, 63 last_phase_is_kill: bool = True, 64 ): 65 phase_time_limit = phase_time_limit or 0 66 try: 67 if first_phase_is_wait: 68 try: 69 i(WaitCoro, WaitCoroRequest(phase_time_limit, kill_on_timeout=False, tree=tree, result_required=False).single(coro_id)) 70 except TimeoutError: 71 pass 72 73 if (ex_type is not None) or (ex_value is not None): 74 if not i(ThrowCoro, coro_id, ex_type, ex_value, ex_traceback, tree): 75 raise CoroutineNotFoundError 76 77 try: 78 i(WaitCoro, WaitCoroRequest(phase_time_limit, kill_on_timeout=False, tree=tree, result_required=False).single(coro_id)) 79 except TimeoutError: 80 pass 81 82 if not i(ThrowCoro, coro_id, GracefulCoroDestroy, tree=tree): 83 raise CoroutineNotFoundError 84 85 try: 86 i(WaitCoro, WaitCoroRequest(phase_time_limit, kill_on_timeout=last_phase_is_kill, tree=tree, result_required=False).single(coro_id)) 87 except TimeoutError: 88 pass 89 90 except CoroutineNotFoundError: 91 pass 92 finally: 93 i(Yield)
async def
agraceful_coro_destroyer( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, phase_time_limit: Union[float, NoneType], coro_id: int, ex_type: Type[Exception] = None, ex_value: Exception = None, ex_traceback: Any = None, tree: bool = True, first_phase_is_wait: bool = True, last_phase_is_kill: bool = True):
96async def agraceful_coro_destroyer( 97 i: Interface, 98 phase_time_limit: Optional[float], 99 coro_id: CoroID, 100 ex_type: Type[Exception] = None, ex_value: Exception = None, ex_traceback: Any = None, 101 tree: bool = True, 102 first_phase_is_wait: bool = True, 103 last_phase_is_kill: bool = True, 104 ): 105 phase_time_limit = phase_time_limit or 0 106 try: 107 if first_phase_is_wait: 108 try: 109 await i(WaitCoro, WaitCoroRequest(phase_time_limit, kill_on_timeout=False, tree=tree, result_required=False).single(coro_id)) 110 except TimeoutError: 111 pass 112 113 if (ex_type is not None) or (ex_value is not None): 114 if not await i(ThrowCoro, coro_id, ex_type, ex_value, ex_traceback, tree): 115 raise CoroutineNotFoundError 116 117 try: 118 await i(WaitCoro, WaitCoroRequest(phase_time_limit, kill_on_timeout=False, tree=tree, result_required=False).single(coro_id)) 119 except TimeoutError: 120 pass 121 122 if not await i(ThrowCoro, coro_id, GracefulCoroDestroy, tree=tree): 123 raise CoroutineNotFoundError 124 125 try: 126 await i(WaitCoro, WaitCoroRequest(phase_time_limit, kill_on_timeout=last_phase_is_kill, tree=tree, result_required=False).single(coro_id)) 127 except TimeoutError: 128 pass 129 130 except CoroutineNotFoundError: 131 pass 132 finally: 133 await i(Yield)
def
set_my_exit_handler( handler: Callable[[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroWrapperBase], Union[bool, NoneType]]) -> bool:
def
set_exit_handler( handler: Callable[[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroWrapperBase], Union[bool, NoneType]], coro_id: Union[int, NoneType] = None) -> bool:
142def set_exit_handler(handler: Callable[[CoroWrapperBase], Optional[bool]], coro_id: Optional[CoroID] = None) -> bool: 143 current_loop: CoroSchedulerType = current_coro_scheduler() 144 if coro_id is None: 145 i: Interface = current_loop.current_interface() 146 if i is None: 147 return False 148 149 coro_id = i.coro_id 150 151 if coro_id is None: 152 return False 153 154 coro, was_new_born, new_born_index = current_loop.find_coro_by_id(coro_id) 155 coro = cast(CoroWrapperBase, coro) 156 coro.add_on_coro_del_handler(handler)
def
kill_coro_children( coro: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroWrapperBase):
def
kill_my_children_on_exit() -> bool:
def
kill_children_on_exit(coro_id: Union[int, NoneType] = None) -> bool:
172def kill_children_on_exit(coro_id: Optional[CoroID] = None) -> bool: 173 current_loop: CoroSchedulerType = current_coro_scheduler() 174 if coro_id is None: 175 i: Interface = current_loop.current_interface() 176 if i is None: 177 return False 178 179 coro_id = i.coro_id 180 181 if coro_id is None: 182 return False 183 184 coro, was_new_born, new_born_index = current_loop.find_coro_by_id(coro_id) 185 coro = cast(CoroWrapperBase, coro) 186 coro.add_on_coro_del_handler(kill_coro_children)