cengal.parallel_execution.coroutines.coro_tools.prepare_loop.versions.v_0.prepare_loop
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19__all__ = ['prepare_loop', 'prepare_fast_loop'] 20 21 22from asyncio import AbstractEventLoop 23from asyncio import Task as asyncio_Task 24from asyncio import get_event_loop, get_running_loop 25 26from cengal.code_flow_control.smart_values import ValueExistence 27from cengal.parallel_execution.coroutines.coro_scheduler import * 28from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import ( 29 PutCoro, PutCoroRequest) 30from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import \ 31 RunCoro 32from cengal.parallel_execution.coroutines.coro_standard_services.wait_coro import ( 33 CoroutineNotFoundError, WaitCoro, WaitCoroRequest) 34from cengal.parallel_execution.coroutines.coro_standard_services.instance import Instance, InstanceRequest 35from cengal.parallel_execution.coroutines.coro_tools.await_coro import ( 36 RunSchedulerInAsyncioLoop, await_coro_fast) 37from cengal.time_management.sleep_tools import get_usable_min_sleep_interval 38from typing import Tuple, Any, Union, Optional 39 40""" 41Module Docstring 42Docstrings: http://www.python.org/dev/peps/pep-0257/ 43""" 44 45__author__ = "ButenkoMS <gtalk@butenkoms.space>" 46__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 47__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 48__license__ = "Apache License, Version 2.0" 49__version__ = "4.4.1" 50__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 51__email__ = "gtalk@butenkoms.space" 52# __status__ = "Prototype" 53__status__ = "Development" 54# __status__ = "Production" 55 56 57class LoopWasEndedBeforeSetupWasPrepared(Exception): 58 pass 59 60 61def prepare_loop(setup_coro_worker: Optional[AnyWorker], *args, **kwargs) -> Tuple[CoroSchedulerType, ValueExistence[Any]]: 62 cs = CoroScheduler() 63 set_primary_coro_scheduler(cs) 64 cs.turn_on_embedded_mode(True) 65 cs.set_coro_time_measurement(True) 66 cs.set_coro_history_gathering(True) 67 cs.set_loop_iteration_time_measurement(True) 68 cs.set_global_on_start_handlers(True) 69 cs.suppress_coro_exceptions = True 70 cs.suppress_warnings_about_responses_to_not_existant_coroutines = True 71 cs.use_internal_sleep = True 72 73 async def initial_setup(i: Interface): 74 await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(True)) 75 76 initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup) 77 78 async def main(i: Interface, initial_setup_coro: CoroWrapperBase, setup_coro_worker: Optional[AnyWorker], *args, **kwargs): 79 try: 80 await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id)) 81 except CoroutineNotFoundError: 82 pass 83 84 if setup_coro_worker is None: 85 return None 86 87 return await i(RunCoro, setup_coro_worker, *args, **kwargs) 88 89 coro: CoroWrapperBase = cs.put_coro(main, initial_setup_coro, setup_coro_worker, *args, **kwargs) 90 result: ValueExistence = ValueExistence() 91 92 def gather(coro: CoroWrapperBase) -> bool: 93 result.value = (coro.last_result, coro.exception) 94 return True 95 96 coro.add_on_coro_del_handler(gather) 97 in_work: bool = True 98 while in_work and (not result): 99 in_work = cs.iteration() 100 101 if not result: 102 raise LoopWasEndedBeforeSetupWasPrepared 103 104 result, exception = result.value 105 if exception is not None: 106 raise exception 107 108 return cs, result 109 110 111def prepare_fast_loop(setup_coro_worker: Optional[AnyWorker], *args, **kwargs) -> Tuple[CoroSchedulerType, ValueExistence[Any]]: 112 """30% faster than prepare_loop() 113 114 Args: 115 coro_worker (AnyWorker): _description_ 116 117 Raises: 118 exception.value: _description_ 119 120 Returns: 121 _type_: _description_ 122 """ 123 cs = CoroScheduler() 124 set_primary_coro_scheduler(cs) 125 cs.turn_on_embedded_mode(False) 126 cs.set_coro_time_measurement(False) 127 cs.set_coro_history_gathering(False) 128 cs.set_loop_iteration_time_measurement(False) 129 cs.set_global_on_start_handlers(False) 130 cs.suppress_coro_exceptions = True 131 cs.suppress_warnings_about_responses_to_not_existant_coroutines = True 132 cs.use_internal_sleep = True 133 134 async def initial_setup(i: Interface): 135 await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(False)) 136 137 initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup) 138 139 async def main(i: Interface, initial_setup_coro: CoroWrapperBase, setup_coro_worker: Optional[AnyWorker], *args, **kwargs): 140 try: 141 await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id)) 142 except CoroutineNotFoundError: 143 pass 144 145 if setup_coro_worker is None: 146 return None 147 148 return await i(RunCoro, setup_coro_worker, *args, **kwargs) 149 150 coro: CoroWrapperBase = cs.put_coro(main, initial_setup_coro, setup_coro_worker, *args, **kwargs) 151 result: ValueExistence = ValueExistence() 152 153 def gather(coro: CoroWrapperBase) -> bool: 154 result.value = (coro.last_result, coro.exception) 155 return True 156 157 coro.add_on_coro_del_handler(gather) 158 in_work: bool = True 159 while in_work and (not result): 160 in_work = cs.iteration() 161 162 if not result: 163 raise LoopWasEndedBeforeSetupWasPrepared 164 165 result, exception = result.value 166 if exception is not None: 167 raise exception 168 169 return cs, result
def
prepare_loop( setup_coro_worker: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, Awaitable[Any]], NoneType], *args, **kwargs) -> Tuple[Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable], cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[Any]]:
62def prepare_loop(setup_coro_worker: Optional[AnyWorker], *args, **kwargs) -> Tuple[CoroSchedulerType, ValueExistence[Any]]: 63 cs = CoroScheduler() 64 set_primary_coro_scheduler(cs) 65 cs.turn_on_embedded_mode(True) 66 cs.set_coro_time_measurement(True) 67 cs.set_coro_history_gathering(True) 68 cs.set_loop_iteration_time_measurement(True) 69 cs.set_global_on_start_handlers(True) 70 cs.suppress_coro_exceptions = True 71 cs.suppress_warnings_about_responses_to_not_existant_coroutines = True 72 cs.use_internal_sleep = True 73 74 async def initial_setup(i: Interface): 75 await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(True)) 76 77 initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup) 78 79 async def main(i: Interface, initial_setup_coro: CoroWrapperBase, setup_coro_worker: Optional[AnyWorker], *args, **kwargs): 80 try: 81 await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id)) 82 except CoroutineNotFoundError: 83 pass 84 85 if setup_coro_worker is None: 86 return None 87 88 return await i(RunCoro, setup_coro_worker, *args, **kwargs) 89 90 coro: CoroWrapperBase = cs.put_coro(main, initial_setup_coro, setup_coro_worker, *args, **kwargs) 91 result: ValueExistence = ValueExistence() 92 93 def gather(coro: CoroWrapperBase) -> bool: 94 result.value = (coro.last_result, coro.exception) 95 return True 96 97 coro.add_on_coro_del_handler(gather) 98 in_work: bool = True 99 while in_work and (not result): 100 in_work = cs.iteration() 101 102 if not result: 103 raise LoopWasEndedBeforeSetupWasPrepared 104 105 result, exception = result.value 106 if exception is not None: 107 raise exception 108 109 return cs, result
def
prepare_fast_loop( setup_coro_worker: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, Awaitable[Any]], NoneType], *args, **kwargs) -> Tuple[Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable], cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[Any]]:
112def prepare_fast_loop(setup_coro_worker: Optional[AnyWorker], *args, **kwargs) -> Tuple[CoroSchedulerType, ValueExistence[Any]]: 113 """30% faster than prepare_loop() 114 115 Args: 116 coro_worker (AnyWorker): _description_ 117 118 Raises: 119 exception.value: _description_ 120 121 Returns: 122 _type_: _description_ 123 """ 124 cs = CoroScheduler() 125 set_primary_coro_scheduler(cs) 126 cs.turn_on_embedded_mode(False) 127 cs.set_coro_time_measurement(False) 128 cs.set_coro_history_gathering(False) 129 cs.set_loop_iteration_time_measurement(False) 130 cs.set_global_on_start_handlers(False) 131 cs.suppress_coro_exceptions = True 132 cs.suppress_warnings_about_responses_to_not_existant_coroutines = True 133 cs.use_internal_sleep = True 134 135 async def initial_setup(i: Interface): 136 await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(False)) 137 138 initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup) 139 140 async def main(i: Interface, initial_setup_coro: CoroWrapperBase, setup_coro_worker: Optional[AnyWorker], *args, **kwargs): 141 try: 142 await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id)) 143 except CoroutineNotFoundError: 144 pass 145 146 if setup_coro_worker is None: 147 return None 148 149 return await i(RunCoro, setup_coro_worker, *args, **kwargs) 150 151 coro: CoroWrapperBase = cs.put_coro(main, initial_setup_coro, setup_coro_worker, *args, **kwargs) 152 result: ValueExistence = ValueExistence() 153 154 def gather(coro: CoroWrapperBase) -> bool: 155 result.value = (coro.last_result, coro.exception) 156 return True 157 158 coro.add_on_coro_del_handler(gather) 159 in_work: bool = True 160 while in_work and (not result): 161 in_work = cs.iteration() 162 163 if not result: 164 raise LoopWasEndedBeforeSetupWasPrepared 165 166 result, exception = result.value 167 if exception is not None: 168 raise exception 169 170 return cs, result
30% faster than prepare_loop()
Args: coro_worker (AnyWorker): _description_
Raises: exception.value: _description_
Returns: _type_: _description_