cengal.parallel_execution.coroutines.coro_tools.run_in_loop.versions.v_0.run_in_loop

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19__all__ = ['run_in_loop', 'arun_in_loop', 'arun_in_fast_loop', 'run_in_fast_loop']
 20
 21
 22from asyncio import AbstractEventLoop
 23from asyncio import Task as asyncio_Task
 24from asyncio import get_event_loop, get_running_loop
 25
 26from cengal.code_flow_control.smart_values import ValueExistence
 27from cengal.parallel_execution.coroutines.coro_scheduler import *
 28from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import (
 29    PutCoro, PutCoroRequest)
 30from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import \
 31    RunCoro
 32from cengal.parallel_execution.coroutines.coro_standard_services.wait_coro import (
 33    CoroutineNotFoundError, WaitCoro, WaitCoroRequest)
 34from cengal.parallel_execution.coroutines.coro_standard_services.instance import Instance, InstanceRequest
 35from cengal.parallel_execution.coroutines.coro_tools.await_coro import (
 36    RunSchedulerInAsyncioLoop, await_coro_fast)
 37from cengal.time_management.sleep_tools import get_usable_min_sleep_interval
 38
 39"""
 40Module Docstring
 41Docstrings: http://www.python.org/dev/peps/pep-0257/
 42"""
 43
 44__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 45__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 46__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 47__license__ = "Apache License, Version 2.0"
 48__version__ = "4.4.1"
 49__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 50__email__ = "gtalk@butenkoms.space"
 51# __status__ = "Prototype"
 52__status__ = "Development"
 53# __status__ = "Production"
 54
 55
 56async def arun_in_loop(coro_worker: AnyWorker, *args, **kwargs):
 57    async_loop = get_running_loop()
 58    cs = CoroScheduler()
 59    set_primary_coro_scheduler(cs)
 60    cs.turn_on_embedded_mode(True)
 61    cs.set_coro_time_measurement(True)
 62    cs.set_coro_history_gathering(True)
 63    cs.set_loop_iteration_time_measurement(True)
 64    cs.set_global_on_start_handlers(True)
 65    cs.suppress_coro_exceptions = True
 66    cs.suppress_warnings_about_responses_to_not_existant_coroutines = True
 67    cs.use_internal_sleep = False
 68
 69    async def initial_setup(i: Interface):
 70        await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(True))
 71
 72    initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup)
 73
 74    async def main(i: Interface, rs: RunSchedulerInAsyncioLoop, initial_setup_coro: CoroWrapperBase, coro_worker: AnyWorker, *args, **kwargs):
 75        try:
 76            await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id))
 77        except CoroutineNotFoundError:
 78            pass
 79
 80        await i(Instance, InstanceRequest().set(RunSchedulerInAsyncioLoop, rs))
 81        return await i(RunCoro, coro_worker, *args, **kwargs)
 82
 83    rs = RunSchedulerInAsyncioLoop(cs, 0.020, async_loop, 6)
 84    rs.make_idle_when_possible = True
 85    rs.need_to_stop_when_possible = False
 86    rs.register()
 87    return await await_coro_fast(async_loop, cs, CoroType.auto, main, rs, initial_setup_coro, coro_worker, *args, **kwargs)
 88
 89
 90def run_in_loop(coro_worker: AnyWorker, *args, **kwargs):
 91    cs = CoroScheduler()
 92    set_primary_coro_scheduler(cs)
 93    cs.turn_on_embedded_mode(True)
 94    cs.set_coro_time_measurement(True)
 95    cs.set_coro_history_gathering(True)
 96    cs.set_loop_iteration_time_measurement(True)
 97    cs.set_global_on_start_handlers(True)
 98    cs.suppress_coro_exceptions = True
 99    cs.suppress_warnings_about_responses_to_not_existant_coroutines = True
100    cs.use_internal_sleep = True
101
102    async def initial_setup(i: Interface):
103        await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(True))
104
105    initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup)
106
107    async def main(i: Interface, initial_setup_coro: CoroWrapperBase, coro_worker: AnyWorker, *args, **kwargs):
108        try:
109            await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id))
110        except CoroutineNotFoundError:
111            pass
112
113        return await i(RunCoro, coro_worker, *args, **kwargs)
114
115    coro: CoroWrapperBase = cs.put_coro(main, initial_setup_coro, coro_worker, *args, **kwargs)
116    result: ValueExistence = ValueExistence()
117    exception: ValueExistence = ValueExistence()
118
119    def gather(coro: CoroWrapperBase) -> bool:
120        result.value = coro.last_result
121        exception.value = coro.exception
122        return True
123
124    coro.add_on_coro_del_handler(gather)
125    cs.loop()
126    if exception.value is not None:
127        raise exception.value
128
129    return result.value
130
131
132async def arun_in_fast_loop(coro_worker: AnyWorker, *args, **kwargs):
133    """30% faster than arun_in_loop()
134
135    Args:
136        coro_worker (AnyWorker): _description_
137
138    Returns:
139        _type_: _description_
140    """
141    async_loop = get_running_loop()
142    cs = CoroScheduler()
143    set_primary_coro_scheduler(cs)
144    cs.turn_on_embedded_mode(False)
145    cs.set_coro_time_measurement(False)
146    cs.set_coro_history_gathering(False)
147    cs.set_loop_iteration_time_measurement(False)
148    cs.set_global_on_start_handlers(False)
149    cs.suppress_coro_exceptions = True
150    cs.suppress_warnings_about_responses_to_not_existant_coroutines = True
151    cs.use_internal_sleep = False
152
153    async def initial_setup(i: Interface):
154        await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(False))
155
156    initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup)
157
158    async def main(i: Interface, rs: RunSchedulerInAsyncioLoop, initial_setup_coro: CoroWrapperBase, coro_worker: AnyWorker, *args, **kwargs):
159        try:
160            await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id))
161        except CoroutineNotFoundError:
162            pass
163        
164        await i(Instance, InstanceRequest().set(RunSchedulerInAsyncioLoop, rs))
165        return await i(RunCoro, coro_worker, *args, **kwargs)
166
167    rs = RunSchedulerInAsyncioLoop(cs, 0.020, async_loop, 6)
168    rs.make_idle_when_possible = True
169    rs.need_to_stop_when_possible = False
170    rs.register()
171    return await await_coro_fast(async_loop, cs, CoroType.auto, main, rs, initial_setup_coro, coro_worker, *args, **kwargs)
172
173
174def run_in_fast_loop(coro_worker: AnyWorker, *args, **kwargs):
175    """30% faster than run_in_loop()
176
177    Args:
178        coro_worker (AnyWorker): _description_
179
180    Raises:
181        exception.value: _description_
182
183    Returns:
184        _type_: _description_
185    """
186    cs = CoroScheduler()
187    set_primary_coro_scheduler(cs)
188    cs.turn_on_embedded_mode(False)
189    cs.set_coro_time_measurement(False)
190    cs.set_coro_history_gathering(False)
191    cs.set_loop_iteration_time_measurement(False)
192    cs.set_global_on_start_handlers(False)
193    cs.suppress_coro_exceptions = True
194    cs.suppress_warnings_about_responses_to_not_existant_coroutines = True
195    cs.use_internal_sleep = True
196
197    async def initial_setup(i: Interface):
198        await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(False))
199
200    initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup)
201
202    async def main(i: Interface, initial_setup_coro: CoroWrapperBase, coro_worker: AnyWorker, *args, **kwargs):
203        try:
204            await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id))
205        except CoroutineNotFoundError:
206            pass
207
208        return await i(RunCoro, coro_worker, *args, **kwargs)
209
210    coro: CoroWrapperBase = cs.put_coro(main, initial_setup_coro, coro_worker, *args, **kwargs)
211    result: ValueExistence = ValueExistence()
212    exception: ValueExistence = ValueExistence()
213
214    def gather(coro: CoroWrapperBase) -> bool:
215        result.value = coro.last_result
216        exception.value = coro.exception
217        return True
218
219    coro.add_on_coro_del_handler(gather)
220    cs.loop()
221    if exception.value is not None:
222        raise exception.value
223
224    return result.value
def run_in_loop( coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs):
 91def run_in_loop(coro_worker: AnyWorker, *args, **kwargs):
 92    cs = CoroScheduler()
 93    set_primary_coro_scheduler(cs)
 94    cs.turn_on_embedded_mode(True)
 95    cs.set_coro_time_measurement(True)
 96    cs.set_coro_history_gathering(True)
 97    cs.set_loop_iteration_time_measurement(True)
 98    cs.set_global_on_start_handlers(True)
 99    cs.suppress_coro_exceptions = True
100    cs.suppress_warnings_about_responses_to_not_existant_coroutines = True
101    cs.use_internal_sleep = True
102
103    async def initial_setup(i: Interface):
104        await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(True))
105
106    initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup)
107
108    async def main(i: Interface, initial_setup_coro: CoroWrapperBase, coro_worker: AnyWorker, *args, **kwargs):
109        try:
110            await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id))
111        except CoroutineNotFoundError:
112            pass
113
114        return await i(RunCoro, coro_worker, *args, **kwargs)
115
116    coro: CoroWrapperBase = cs.put_coro(main, initial_setup_coro, coro_worker, *args, **kwargs)
117    result: ValueExistence = ValueExistence()
118    exception: ValueExistence = ValueExistence()
119
120    def gather(coro: CoroWrapperBase) -> bool:
121        result.value = coro.last_result
122        exception.value = coro.exception
123        return True
124
125    coro.add_on_coro_del_handler(gather)
126    cs.loop()
127    if exception.value is not None:
128        raise exception.value
129
130    return result.value
async def arun_in_loop( coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs):
57async def arun_in_loop(coro_worker: AnyWorker, *args, **kwargs):
58    async_loop = get_running_loop()
59    cs = CoroScheduler()
60    set_primary_coro_scheduler(cs)
61    cs.turn_on_embedded_mode(True)
62    cs.set_coro_time_measurement(True)
63    cs.set_coro_history_gathering(True)
64    cs.set_loop_iteration_time_measurement(True)
65    cs.set_global_on_start_handlers(True)
66    cs.suppress_coro_exceptions = True
67    cs.suppress_warnings_about_responses_to_not_existant_coroutines = True
68    cs.use_internal_sleep = False
69
70    async def initial_setup(i: Interface):
71        await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(True))
72
73    initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup)
74
75    async def main(i: Interface, rs: RunSchedulerInAsyncioLoop, initial_setup_coro: CoroWrapperBase, coro_worker: AnyWorker, *args, **kwargs):
76        try:
77            await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id))
78        except CoroutineNotFoundError:
79            pass
80
81        await i(Instance, InstanceRequest().set(RunSchedulerInAsyncioLoop, rs))
82        return await i(RunCoro, coro_worker, *args, **kwargs)
83
84    rs = RunSchedulerInAsyncioLoop(cs, 0.020, async_loop, 6)
85    rs.make_idle_when_possible = True
86    rs.need_to_stop_when_possible = False
87    rs.register()
88    return await await_coro_fast(async_loop, cs, CoroType.auto, main, rs, initial_setup_coro, coro_worker, *args, **kwargs)
async def arun_in_fast_loop( coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs):
133async def arun_in_fast_loop(coro_worker: AnyWorker, *args, **kwargs):
134    """30% faster than arun_in_loop()
135
136    Args:
137        coro_worker (AnyWorker): _description_
138
139    Returns:
140        _type_: _description_
141    """
142    async_loop = get_running_loop()
143    cs = CoroScheduler()
144    set_primary_coro_scheduler(cs)
145    cs.turn_on_embedded_mode(False)
146    cs.set_coro_time_measurement(False)
147    cs.set_coro_history_gathering(False)
148    cs.set_loop_iteration_time_measurement(False)
149    cs.set_global_on_start_handlers(False)
150    cs.suppress_coro_exceptions = True
151    cs.suppress_warnings_about_responses_to_not_existant_coroutines = True
152    cs.use_internal_sleep = False
153
154    async def initial_setup(i: Interface):
155        await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(False))
156
157    initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup)
158
159    async def main(i: Interface, rs: RunSchedulerInAsyncioLoop, initial_setup_coro: CoroWrapperBase, coro_worker: AnyWorker, *args, **kwargs):
160        try:
161            await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id))
162        except CoroutineNotFoundError:
163            pass
164        
165        await i(Instance, InstanceRequest().set(RunSchedulerInAsyncioLoop, rs))
166        return await i(RunCoro, coro_worker, *args, **kwargs)
167
168    rs = RunSchedulerInAsyncioLoop(cs, 0.020, async_loop, 6)
169    rs.make_idle_when_possible = True
170    rs.need_to_stop_when_possible = False
171    rs.register()
172    return await await_coro_fast(async_loop, cs, CoroType.auto, main, rs, initial_setup_coro, coro_worker, *args, **kwargs)

30% faster than arun_in_loop()

Args: coro_worker (AnyWorker): _description_

Returns: _type_: _description_

def run_in_fast_loop( coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs):
175def run_in_fast_loop(coro_worker: AnyWorker, *args, **kwargs):
176    """30% faster than run_in_loop()
177
178    Args:
179        coro_worker (AnyWorker): _description_
180
181    Raises:
182        exception.value: _description_
183
184    Returns:
185        _type_: _description_
186    """
187    cs = CoroScheduler()
188    set_primary_coro_scheduler(cs)
189    cs.turn_on_embedded_mode(False)
190    cs.set_coro_time_measurement(False)
191    cs.set_coro_history_gathering(False)
192    cs.set_loop_iteration_time_measurement(False)
193    cs.set_global_on_start_handlers(False)
194    cs.suppress_coro_exceptions = True
195    cs.suppress_warnings_about_responses_to_not_existant_coroutines = True
196    cs.use_internal_sleep = True
197
198    async def initial_setup(i: Interface):
199        await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(False))
200
201    initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup)
202
203    async def main(i: Interface, initial_setup_coro: CoroWrapperBase, coro_worker: AnyWorker, *args, **kwargs):
204        try:
205            await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id))
206        except CoroutineNotFoundError:
207            pass
208
209        return await i(RunCoro, coro_worker, *args, **kwargs)
210
211    coro: CoroWrapperBase = cs.put_coro(main, initial_setup_coro, coro_worker, *args, **kwargs)
212    result: ValueExistence = ValueExistence()
213    exception: ValueExistence = ValueExistence()
214
215    def gather(coro: CoroWrapperBase) -> bool:
216        result.value = coro.last_result
217        exception.value = coro.exception
218        return True
219
220    coro.add_on_coro_del_handler(gather)
221    cs.loop()
222    if exception.value is not None:
223        raise exception.value
224
225    return result.value

30% faster than run_in_loop()

Args: coro_worker (AnyWorker): _description_

Raises: exception.value: _description_

Returns: _type_: _description_