cengal.parallel_execution.coroutines.coro_tools.run_in_loop.versions.v_0.run_in_loop
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19__all__ = ['run_in_loop', 'arun_in_loop', 'arun_in_fast_loop', 'run_in_fast_loop'] 20 21 22from asyncio import AbstractEventLoop 23from asyncio import Task as asyncio_Task 24from asyncio import get_event_loop, get_running_loop 25 26from cengal.code_flow_control.smart_values import ValueExistence 27from cengal.parallel_execution.coroutines.coro_scheduler import * 28from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import ( 29 PutCoro, PutCoroRequest) 30from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import \ 31 RunCoro 32from cengal.parallel_execution.coroutines.coro_standard_services.wait_coro import ( 33 CoroutineNotFoundError, WaitCoro, WaitCoroRequest) 34from cengal.parallel_execution.coroutines.coro_standard_services.instance import Instance, InstanceRequest 35from cengal.parallel_execution.coroutines.coro_tools.await_coro import ( 36 RunSchedulerInAsyncioLoop, await_coro_fast) 37from cengal.time_management.sleep_tools import get_usable_min_sleep_interval 38 39""" 40Module Docstring 41Docstrings: http://www.python.org/dev/peps/pep-0257/ 42""" 43 44__author__ = "ButenkoMS <gtalk@butenkoms.space>" 45__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 46__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 47__license__ = "Apache License, Version 2.0" 48__version__ = "4.4.1" 49__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 50__email__ = "gtalk@butenkoms.space" 51# __status__ = "Prototype" 52__status__ = "Development" 53# __status__ = "Production" 54 55 56async def arun_in_loop(coro_worker: AnyWorker, *args, **kwargs): 57 async_loop = get_running_loop() 58 cs = CoroScheduler() 59 set_primary_coro_scheduler(cs) 60 cs.turn_on_embedded_mode(True) 61 cs.set_coro_time_measurement(True) 62 cs.set_coro_history_gathering(True) 63 cs.set_loop_iteration_time_measurement(True) 64 cs.set_global_on_start_handlers(True) 65 cs.suppress_coro_exceptions = True 66 cs.suppress_warnings_about_responses_to_not_existant_coroutines = True 67 cs.use_internal_sleep = False 68 69 async def initial_setup(i: Interface): 70 await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(True)) 71 72 initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup) 73 74 async def main(i: Interface, rs: RunSchedulerInAsyncioLoop, initial_setup_coro: CoroWrapperBase, coro_worker: AnyWorker, *args, **kwargs): 75 try: 76 await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id)) 77 except CoroutineNotFoundError: 78 pass 79 80 await i(Instance, InstanceRequest().set(RunSchedulerInAsyncioLoop, rs)) 81 return await i(RunCoro, coro_worker, *args, **kwargs) 82 83 rs = RunSchedulerInAsyncioLoop(cs, 0.020, async_loop, 6) 84 rs.make_idle_when_possible = True 85 rs.need_to_stop_when_possible = False 86 rs.register() 87 return await await_coro_fast(async_loop, cs, CoroType.auto, main, rs, initial_setup_coro, coro_worker, *args, **kwargs) 88 89 90def run_in_loop(coro_worker: AnyWorker, *args, **kwargs): 91 cs = CoroScheduler() 92 set_primary_coro_scheduler(cs) 93 cs.turn_on_embedded_mode(True) 94 cs.set_coro_time_measurement(True) 95 cs.set_coro_history_gathering(True) 96 cs.set_loop_iteration_time_measurement(True) 97 cs.set_global_on_start_handlers(True) 98 cs.suppress_coro_exceptions = True 99 cs.suppress_warnings_about_responses_to_not_existant_coroutines = True 100 cs.use_internal_sleep = True 101 102 async def initial_setup(i: Interface): 103 await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(True)) 104 105 initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup) 106 107 async def main(i: Interface, initial_setup_coro: CoroWrapperBase, coro_worker: AnyWorker, *args, **kwargs): 108 try: 109 await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id)) 110 except CoroutineNotFoundError: 111 pass 112 113 return await i(RunCoro, coro_worker, *args, **kwargs) 114 115 coro: CoroWrapperBase = cs.put_coro(main, initial_setup_coro, coro_worker, *args, **kwargs) 116 result: ValueExistence = ValueExistence() 117 exception: ValueExistence = ValueExistence() 118 119 def gather(coro: CoroWrapperBase) -> bool: 120 result.value = coro.last_result 121 exception.value = coro.exception 122 return True 123 124 coro.add_on_coro_del_handler(gather) 125 cs.loop() 126 if exception.value is not None: 127 raise exception.value 128 129 return result.value 130 131 132async def arun_in_fast_loop(coro_worker: AnyWorker, *args, **kwargs): 133 """30% faster than arun_in_loop() 134 135 Args: 136 coro_worker (AnyWorker): _description_ 137 138 Returns: 139 _type_: _description_ 140 """ 141 async_loop = get_running_loop() 142 cs = CoroScheduler() 143 set_primary_coro_scheduler(cs) 144 cs.turn_on_embedded_mode(False) 145 cs.set_coro_time_measurement(False) 146 cs.set_coro_history_gathering(False) 147 cs.set_loop_iteration_time_measurement(False) 148 cs.set_global_on_start_handlers(False) 149 cs.suppress_coro_exceptions = True 150 cs.suppress_warnings_about_responses_to_not_existant_coroutines = True 151 cs.use_internal_sleep = False 152 153 async def initial_setup(i: Interface): 154 await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(False)) 155 156 initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup) 157 158 async def main(i: Interface, rs: RunSchedulerInAsyncioLoop, initial_setup_coro: CoroWrapperBase, coro_worker: AnyWorker, *args, **kwargs): 159 try: 160 await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id)) 161 except CoroutineNotFoundError: 162 pass 163 164 await i(Instance, InstanceRequest().set(RunSchedulerInAsyncioLoop, rs)) 165 return await i(RunCoro, coro_worker, *args, **kwargs) 166 167 rs = RunSchedulerInAsyncioLoop(cs, 0.020, async_loop, 6) 168 rs.make_idle_when_possible = True 169 rs.need_to_stop_when_possible = False 170 rs.register() 171 return await await_coro_fast(async_loop, cs, CoroType.auto, main, rs, initial_setup_coro, coro_worker, *args, **kwargs) 172 173 174def run_in_fast_loop(coro_worker: AnyWorker, *args, **kwargs): 175 """30% faster than run_in_loop() 176 177 Args: 178 coro_worker (AnyWorker): _description_ 179 180 Raises: 181 exception.value: _description_ 182 183 Returns: 184 _type_: _description_ 185 """ 186 cs = CoroScheduler() 187 set_primary_coro_scheduler(cs) 188 cs.turn_on_embedded_mode(False) 189 cs.set_coro_time_measurement(False) 190 cs.set_coro_history_gathering(False) 191 cs.set_loop_iteration_time_measurement(False) 192 cs.set_global_on_start_handlers(False) 193 cs.suppress_coro_exceptions = True 194 cs.suppress_warnings_about_responses_to_not_existant_coroutines = True 195 cs.use_internal_sleep = True 196 197 async def initial_setup(i: Interface): 198 await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(False)) 199 200 initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup) 201 202 async def main(i: Interface, initial_setup_coro: CoroWrapperBase, coro_worker: AnyWorker, *args, **kwargs): 203 try: 204 await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id)) 205 except CoroutineNotFoundError: 206 pass 207 208 return await i(RunCoro, coro_worker, *args, **kwargs) 209 210 coro: CoroWrapperBase = cs.put_coro(main, initial_setup_coro, coro_worker, *args, **kwargs) 211 result: ValueExistence = ValueExistence() 212 exception: ValueExistence = ValueExistence() 213 214 def gather(coro: CoroWrapperBase) -> bool: 215 result.value = coro.last_result 216 exception.value = coro.exception 217 return True 218 219 coro.add_on_coro_del_handler(gather) 220 cs.loop() 221 if exception.value is not None: 222 raise exception.value 223 224 return result.value
def
run_in_loop( coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs):
91def run_in_loop(coro_worker: AnyWorker, *args, **kwargs): 92 cs = CoroScheduler() 93 set_primary_coro_scheduler(cs) 94 cs.turn_on_embedded_mode(True) 95 cs.set_coro_time_measurement(True) 96 cs.set_coro_history_gathering(True) 97 cs.set_loop_iteration_time_measurement(True) 98 cs.set_global_on_start_handlers(True) 99 cs.suppress_coro_exceptions = True 100 cs.suppress_warnings_about_responses_to_not_existant_coroutines = True 101 cs.use_internal_sleep = True 102 103 async def initial_setup(i: Interface): 104 await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(True)) 105 106 initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup) 107 108 async def main(i: Interface, initial_setup_coro: CoroWrapperBase, coro_worker: AnyWorker, *args, **kwargs): 109 try: 110 await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id)) 111 except CoroutineNotFoundError: 112 pass 113 114 return await i(RunCoro, coro_worker, *args, **kwargs) 115 116 coro: CoroWrapperBase = cs.put_coro(main, initial_setup_coro, coro_worker, *args, **kwargs) 117 result: ValueExistence = ValueExistence() 118 exception: ValueExistence = ValueExistence() 119 120 def gather(coro: CoroWrapperBase) -> bool: 121 result.value = coro.last_result 122 exception.value = coro.exception 123 return True 124 125 coro.add_on_coro_del_handler(gather) 126 cs.loop() 127 if exception.value is not None: 128 raise exception.value 129 130 return result.value
async def
arun_in_loop( coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs):
57async def arun_in_loop(coro_worker: AnyWorker, *args, **kwargs): 58 async_loop = get_running_loop() 59 cs = CoroScheduler() 60 set_primary_coro_scheduler(cs) 61 cs.turn_on_embedded_mode(True) 62 cs.set_coro_time_measurement(True) 63 cs.set_coro_history_gathering(True) 64 cs.set_loop_iteration_time_measurement(True) 65 cs.set_global_on_start_handlers(True) 66 cs.suppress_coro_exceptions = True 67 cs.suppress_warnings_about_responses_to_not_existant_coroutines = True 68 cs.use_internal_sleep = False 69 70 async def initial_setup(i: Interface): 71 await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(True)) 72 73 initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup) 74 75 async def main(i: Interface, rs: RunSchedulerInAsyncioLoop, initial_setup_coro: CoroWrapperBase, coro_worker: AnyWorker, *args, **kwargs): 76 try: 77 await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id)) 78 except CoroutineNotFoundError: 79 pass 80 81 await i(Instance, InstanceRequest().set(RunSchedulerInAsyncioLoop, rs)) 82 return await i(RunCoro, coro_worker, *args, **kwargs) 83 84 rs = RunSchedulerInAsyncioLoop(cs, 0.020, async_loop, 6) 85 rs.make_idle_when_possible = True 86 rs.need_to_stop_when_possible = False 87 rs.register() 88 return await await_coro_fast(async_loop, cs, CoroType.auto, main, rs, initial_setup_coro, coro_worker, *args, **kwargs)
async def
arun_in_fast_loop( coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs):
133async def arun_in_fast_loop(coro_worker: AnyWorker, *args, **kwargs): 134 """30% faster than arun_in_loop() 135 136 Args: 137 coro_worker (AnyWorker): _description_ 138 139 Returns: 140 _type_: _description_ 141 """ 142 async_loop = get_running_loop() 143 cs = CoroScheduler() 144 set_primary_coro_scheduler(cs) 145 cs.turn_on_embedded_mode(False) 146 cs.set_coro_time_measurement(False) 147 cs.set_coro_history_gathering(False) 148 cs.set_loop_iteration_time_measurement(False) 149 cs.set_global_on_start_handlers(False) 150 cs.suppress_coro_exceptions = True 151 cs.suppress_warnings_about_responses_to_not_existant_coroutines = True 152 cs.use_internal_sleep = False 153 154 async def initial_setup(i: Interface): 155 await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(False)) 156 157 initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup) 158 159 async def main(i: Interface, rs: RunSchedulerInAsyncioLoop, initial_setup_coro: CoroWrapperBase, coro_worker: AnyWorker, *args, **kwargs): 160 try: 161 await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id)) 162 except CoroutineNotFoundError: 163 pass 164 165 await i(Instance, InstanceRequest().set(RunSchedulerInAsyncioLoop, rs)) 166 return await i(RunCoro, coro_worker, *args, **kwargs) 167 168 rs = RunSchedulerInAsyncioLoop(cs, 0.020, async_loop, 6) 169 rs.make_idle_when_possible = True 170 rs.need_to_stop_when_possible = False 171 rs.register() 172 return await await_coro_fast(async_loop, cs, CoroType.auto, main, rs, initial_setup_coro, coro_worker, *args, **kwargs)
30% faster than arun_in_loop()
Args: coro_worker (AnyWorker): _description_
Returns: _type_: _description_
def
run_in_fast_loop( coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs):
175def run_in_fast_loop(coro_worker: AnyWorker, *args, **kwargs): 176 """30% faster than run_in_loop() 177 178 Args: 179 coro_worker (AnyWorker): _description_ 180 181 Raises: 182 exception.value: _description_ 183 184 Returns: 185 _type_: _description_ 186 """ 187 cs = CoroScheduler() 188 set_primary_coro_scheduler(cs) 189 cs.turn_on_embedded_mode(False) 190 cs.set_coro_time_measurement(False) 191 cs.set_coro_history_gathering(False) 192 cs.set_loop_iteration_time_measurement(False) 193 cs.set_global_on_start_handlers(False) 194 cs.suppress_coro_exceptions = True 195 cs.suppress_warnings_about_responses_to_not_existant_coroutines = True 196 cs.use_internal_sleep = True 197 198 async def initial_setup(i: Interface): 199 await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(False)) 200 201 initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup) 202 203 async def main(i: Interface, initial_setup_coro: CoroWrapperBase, coro_worker: AnyWorker, *args, **kwargs): 204 try: 205 await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id)) 206 except CoroutineNotFoundError: 207 pass 208 209 return await i(RunCoro, coro_worker, *args, **kwargs) 210 211 coro: CoroWrapperBase = cs.put_coro(main, initial_setup_coro, coro_worker, *args, **kwargs) 212 result: ValueExistence = ValueExistence() 213 exception: ValueExistence = ValueExistence() 214 215 def gather(coro: CoroWrapperBase) -> bool: 216 result.value = coro.last_result 217 exception.value = coro.exception 218 return True 219 220 coro.add_on_coro_del_handler(gather) 221 cs.loop() 222 if exception.value is not None: 223 raise exception.value 224 225 return result.value
30% faster than run_in_loop()
Args: coro_worker (AnyWorker): _description_
Raises: exception.value: _description_
Returns: _type_: _description_