cengal.parallel_execution.coroutines.coro_tools.wait_coro.versions.v_0.wait_coro
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19__all__ = ['sync_coro', 'sync_coro_param'] 20 21 22from cengal.parallel_execution.coroutines.coro_scheduler import * 23# from cengal.parallel_execution.coroutines.coro_standard_services import * 24from typing import Union, Optional, Any, List, Tuple, Dict 25from cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request import * 26from cengal.code_flow_control.smart_values import ValueExistence 27from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import put_coro_to 28from cengal.parallel_execution.coroutines.coro_standard_services.wait_coro import WaitCoro, WaitCoroRequest 29from cengal.parallel_execution.coroutines.coro_standard_services.put_coro_list import PSCP 30from cengal.parallel_execution.coroutines.coro_tools.prepare_loop import prepare_loop, prepare_fast_loop 31 32 33""" 34Module Docstring 35Docstrings: http://www.python.org/dev/peps/pep-0257/ 36""" 37 38__author__ = "ButenkoMS <gtalk@butenkoms.space>" 39__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 40__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 41__license__ = "Apache License, Version 2.0" 42__version__ = "4.4.1" 43__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 44__email__ = "gtalk@butenkoms.space" 45# __status__ = "Prototype" 46__status__ = "Development" 47# __status__ = "Production" 48 49 50class LoopWasEndedBeforeResultWasComputed(Exception): 51 pass 52 53 54def sync_coro_param(result_required: bool = True, timeout: Optional[float] = None, kill_on_timeout: bool = True, tree: bool = True, cs: Optional[CoroSchedulerType] = None, prepare_own_loop_if_not_found: bool = False, own_loop_shold_be_fast_loop: bool = True, own_loop_setup_coro_worker: Optional[AnyWorker] = None, own_loop_setup_coro_worker_args_kwargs: Optional[Tuple[Tuple, Dict]] = None): 55 """Decorator. With an arguments. Gives ability to execute any decorated Cengal coroutine from plain sync/async 56 code as a sync function. Can postpone execution to the actual loop when possible if None as an immediate result 57 (no result) is acceptible. Can start own loop if needed. 58 59 Args: 60 result_required (bool, optional): _description_. Defaults to True. 61 timeout (Optional[float], optional): _description_. Defaults to None. 62 kill_on_timeout (bool, optional): _description_. Defaults to True. 63 tree (bool, optional): _description_. Defaults to True. 64 cs (Optional[CoroSchedulerType], optional): _description_. Defaults to None. 65 prepare_own_loop_if_not_found (bool, optional): _description_. Defaults to False. 66 own_loop_shold_be_fast_loop (bool, optional): _description_. Defaults to True. 67 own_loop_setup_coro_worker (Optional[AnyWorker], optional): _description_. Defaults to None. 68 own_loop_setup_coro_worker_args_kwargs (Optional[Tuple[Tuple, Dict]], optional): _description_. Defaults to None. 69 70 Raises: 71 exception: _description_ 72 exception: _description_ 73 LoopWasEndedBeforeResultWasComputed: _description_ 74 TypeError: _description_ 75 TypeError: _description_ 76 77 Returns: 78 _type_: _description_ 79 """ 80 cs0 = cs 81 def sync_coro_decorator(coro_worker: Worker): 82 cs1 = cs0 83 def wrapper(*args, **kwargs): 84 cs = cs1 85 done: ValueExistence = ValueExistence() 86 async def coro(i: Interface, done: ValueExistence, coro_worker: Worker, *args, **kwargs): 87 results: List[Tuple[CoroID, Any, Union[None, Exception]]] = await i(WaitCoro, WaitCoroRequest(timeout, kill_on_timeout, tree).put_list([PSCP(coro_worker, *args, **kwargs)])) 88 done.value = results[0] 89 90 direct_execution_required: bool = False 91 if cs is None: 92 cs = get_available_coro_scheduler() 93 94 if (cs is None) and prepare_own_loop_if_not_found: 95 if own_loop_setup_coro_worker_args_kwargs is None: 96 args: Tuple = tuple() 97 kwargs: Dict = dict() 98 else: 99 args, kwargs = own_loop_setup_coro_worker_args_kwargs 100 101 if own_loop_shold_be_fast_loop: 102 cs, _ = prepare_fast_loop(own_loop_setup_coro_worker, *args, **kwargs) 103 else: 104 cs, _ = prepare_loop(own_loop_setup_coro_worker, *args, **kwargs) 105 106 if cs is None: 107 direct_execution_required = True 108 else: 109 i: Optional[Interface] = None 110 inside_loop: bool = False 111 inside_coro: bool = False 112 try: 113 i = current_interface() 114 inside_loop = True 115 if i is not None: 116 inside_coro = True 117 except OutsideCoroSchedulerContext: 118 pass 119 120 if inside_loop: 121 if inside_coro: 122 inside_greenlet_coro: bool = False 123 if isinstance(i, InterfaceGreenlet): 124 inside_greenlet_coro = True 125 126 if inside_greenlet_coro: 127 results: List[Tuple[CoroID, Any, Union[None, Exception]]] = i(WaitCoro, WaitCoroRequest(timeout, kill_on_timeout, tree).put_list([PSCP(coro_worker, *args, **kwargs)])) 128 _, result, exception = results[0] 129 if exception is not None: 130 raise exception 131 132 return result 133 else: 134 pass 135 else: 136 pass 137 else: 138 put_coro_to(get_interface_and_loop_with_explicit_loop(cs), coro, done, coro_worker, *args, **kwargs) 139 in_work = True 140 while in_work and (not done): 141 in_work = cs.iteration() 142 143 if done: 144 _, result, exception = done.value 145 if exception is not None: 146 raise exception 147 else: 148 raise LoopWasEndedBeforeResultWasComputed 149 150 if result_required: 151 direct_execution_required = True 152 else: 153 put_coro_to(interface_and_loop_with_explicit_loop(cs), coro_worker, *args, **kwargs) 154 155 if direct_execution_required: 156 coro_worker_type: CoroType = find_coro_type(coro_worker) 157 if CoroType.greenlet == coro_worker_type: 158 return coro_worker(InterfaceFake(None, None), *args, **kwargs) 159 elif CoroType.awaitable == coro_worker_type: 160 raise TypeError(f'Can not directly execute awaitable {coro_worker} from non async environment') 161 else: 162 raise TypeError(f'{coro_worker} is neither an awaitable nor a greenlet') 163 164 return wrapper 165 return sync_coro_decorator 166 167 168def sync_coro(coro_worker: Worker): 169 return sync_coro_param()(coro_worker)
55def sync_coro_param(result_required: bool = True, timeout: Optional[float] = None, kill_on_timeout: bool = True, tree: bool = True, cs: Optional[CoroSchedulerType] = None, prepare_own_loop_if_not_found: bool = False, own_loop_shold_be_fast_loop: bool = True, own_loop_setup_coro_worker: Optional[AnyWorker] = None, own_loop_setup_coro_worker_args_kwargs: Optional[Tuple[Tuple, Dict]] = None): 56 """Decorator. With an arguments. Gives ability to execute any decorated Cengal coroutine from plain sync/async 57 code as a sync function. Can postpone execution to the actual loop when possible if None as an immediate result 58 (no result) is acceptible. Can start own loop if needed. 59 60 Args: 61 result_required (bool, optional): _description_. Defaults to True. 62 timeout (Optional[float], optional): _description_. Defaults to None. 63 kill_on_timeout (bool, optional): _description_. Defaults to True. 64 tree (bool, optional): _description_. Defaults to True. 65 cs (Optional[CoroSchedulerType], optional): _description_. Defaults to None. 66 prepare_own_loop_if_not_found (bool, optional): _description_. Defaults to False. 67 own_loop_shold_be_fast_loop (bool, optional): _description_. Defaults to True. 68 own_loop_setup_coro_worker (Optional[AnyWorker], optional): _description_. Defaults to None. 69 own_loop_setup_coro_worker_args_kwargs (Optional[Tuple[Tuple, Dict]], optional): _description_. Defaults to None. 70 71 Raises: 72 exception: _description_ 73 exception: _description_ 74 LoopWasEndedBeforeResultWasComputed: _description_ 75 TypeError: _description_ 76 TypeError: _description_ 77 78 Returns: 79 _type_: _description_ 80 """ 81 cs0 = cs 82 def sync_coro_decorator(coro_worker: Worker): 83 cs1 = cs0 84 def wrapper(*args, **kwargs): 85 cs = cs1 86 done: ValueExistence = ValueExistence() 87 async def coro(i: Interface, done: ValueExistence, coro_worker: Worker, *args, **kwargs): 88 results: List[Tuple[CoroID, Any, Union[None, Exception]]] = await i(WaitCoro, WaitCoroRequest(timeout, kill_on_timeout, tree).put_list([PSCP(coro_worker, *args, **kwargs)])) 89 done.value = results[0] 90 91 direct_execution_required: bool = False 92 if cs is None: 93 cs = get_available_coro_scheduler() 94 95 if (cs is None) and prepare_own_loop_if_not_found: 96 if own_loop_setup_coro_worker_args_kwargs is None: 97 args: Tuple = tuple() 98 kwargs: Dict = dict() 99 else: 100 args, kwargs = own_loop_setup_coro_worker_args_kwargs 101 102 if own_loop_shold_be_fast_loop: 103 cs, _ = prepare_fast_loop(own_loop_setup_coro_worker, *args, **kwargs) 104 else: 105 cs, _ = prepare_loop(own_loop_setup_coro_worker, *args, **kwargs) 106 107 if cs is None: 108 direct_execution_required = True 109 else: 110 i: Optional[Interface] = None 111 inside_loop: bool = False 112 inside_coro: bool = False 113 try: 114 i = current_interface() 115 inside_loop = True 116 if i is not None: 117 inside_coro = True 118 except OutsideCoroSchedulerContext: 119 pass 120 121 if inside_loop: 122 if inside_coro: 123 inside_greenlet_coro: bool = False 124 if isinstance(i, InterfaceGreenlet): 125 inside_greenlet_coro = True 126 127 if inside_greenlet_coro: 128 results: List[Tuple[CoroID, Any, Union[None, Exception]]] = i(WaitCoro, WaitCoroRequest(timeout, kill_on_timeout, tree).put_list([PSCP(coro_worker, *args, **kwargs)])) 129 _, result, exception = results[0] 130 if exception is not None: 131 raise exception 132 133 return result 134 else: 135 pass 136 else: 137 pass 138 else: 139 put_coro_to(get_interface_and_loop_with_explicit_loop(cs), coro, done, coro_worker, *args, **kwargs) 140 in_work = True 141 while in_work and (not done): 142 in_work = cs.iteration() 143 144 if done: 145 _, result, exception = done.value 146 if exception is not None: 147 raise exception 148 else: 149 raise LoopWasEndedBeforeResultWasComputed 150 151 if result_required: 152 direct_execution_required = True 153 else: 154 put_coro_to(interface_and_loop_with_explicit_loop(cs), coro_worker, *args, **kwargs) 155 156 if direct_execution_required: 157 coro_worker_type: CoroType = find_coro_type(coro_worker) 158 if CoroType.greenlet == coro_worker_type: 159 return coro_worker(InterfaceFake(None, None), *args, **kwargs) 160 elif CoroType.awaitable == coro_worker_type: 161 raise TypeError(f'Can not directly execute awaitable {coro_worker} from non async environment') 162 else: 163 raise TypeError(f'{coro_worker} is neither an awaitable nor a greenlet') 164 165 return wrapper 166 return sync_coro_decorator
Decorator. With an arguments. Gives ability to execute any decorated Cengal coroutine from plain sync/async code as a sync function. Can postpone execution to the actual loop when possible if None as an immediate result (no result) is acceptible. Can start own loop if needed.
Args: result_required (bool, optional): _description_. Defaults to True. timeout (Optional[float], optional): _description_. Defaults to None. kill_on_timeout (bool, optional): _description_. Defaults to True. tree (bool, optional): _description_. Defaults to True. cs (Optional[CoroSchedulerType], optional): _description_. Defaults to None. prepare_own_loop_if_not_found (bool, optional): _description_. Defaults to False. own_loop_shold_be_fast_loop (bool, optional): _description_. Defaults to True. own_loop_setup_coro_worker (Optional[AnyWorker], optional): _description_. Defaults to None. own_loop_setup_coro_worker_args_kwargs (Optional[Tuple[Tuple, Dict]], optional): _description_. Defaults to None.
Raises: exception: _description_ exception: _description_ LoopWasEndedBeforeResultWasComputed: _description_ TypeError: _description_ TypeError: _description_
Returns: _type_: _description_