cengal.parallel_execution.coroutines.coro_tools.prepare_loop.versions.v_0.prepare_loop

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19__all__ = ['prepare_loop', 'prepare_fast_loop']
 20
 21
 22from asyncio import AbstractEventLoop
 23from asyncio import Task as asyncio_Task
 24from asyncio import get_event_loop, get_running_loop
 25
 26from cengal.code_flow_control.smart_values import ValueExistence
 27from cengal.parallel_execution.coroutines.coro_scheduler import *
 28from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import (
 29    PutCoro, PutCoroRequest)
 30from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import \
 31    RunCoro
 32from cengal.parallel_execution.coroutines.coro_standard_services.wait_coro import (
 33    CoroutineNotFoundError, WaitCoro, WaitCoroRequest)
 34from cengal.parallel_execution.coroutines.coro_standard_services.instance import Instance, InstanceRequest
 35from cengal.parallel_execution.coroutines.coro_tools.await_coro import (
 36    RunSchedulerInAsyncioLoop, await_coro_fast)
 37from cengal.time_management.sleep_tools import get_usable_min_sleep_interval
 38from typing import Tuple, Any, Union, Optional
 39
 40"""
 41Module Docstring
 42Docstrings: http://www.python.org/dev/peps/pep-0257/
 43"""
 44
 45__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 46__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 47__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 48__license__ = "Apache License, Version 2.0"
 49__version__ = "4.4.1"
 50__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 51__email__ = "gtalk@butenkoms.space"
 52# __status__ = "Prototype"
 53__status__ = "Development"
 54# __status__ = "Production"
 55
 56
 57class LoopWasEndedBeforeSetupWasPrepared(Exception):
 58    pass
 59
 60
 61def prepare_loop(setup_coro_worker: Optional[AnyWorker], *args, **kwargs) -> Tuple[CoroSchedulerType, ValueExistence[Any]]:
 62    cs = CoroScheduler()
 63    set_primary_coro_scheduler(cs)
 64    cs.turn_on_embedded_mode(True)
 65    cs.set_coro_time_measurement(True)
 66    cs.set_coro_history_gathering(True)
 67    cs.set_loop_iteration_time_measurement(True)
 68    cs.set_global_on_start_handlers(True)
 69    cs.suppress_coro_exceptions = True
 70    cs.suppress_warnings_about_responses_to_not_existant_coroutines = True
 71    cs.use_internal_sleep = True
 72
 73    async def initial_setup(i: Interface):
 74        await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(True))
 75
 76    initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup)
 77
 78    async def main(i: Interface, initial_setup_coro: CoroWrapperBase, setup_coro_worker: Optional[AnyWorker], *args, **kwargs):
 79        try:
 80            await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id))
 81        except CoroutineNotFoundError:
 82            pass
 83
 84        if setup_coro_worker is None:
 85            return None
 86        
 87        return await i(RunCoro, setup_coro_worker, *args, **kwargs)
 88
 89    coro: CoroWrapperBase = cs.put_coro(main, initial_setup_coro, setup_coro_worker, *args, **kwargs)
 90    result: ValueExistence = ValueExistence()
 91
 92    def gather(coro: CoroWrapperBase) -> bool:
 93        result.value = (coro.last_result, coro.exception)
 94        return True
 95
 96    coro.add_on_coro_del_handler(gather)
 97    in_work: bool = True
 98    while in_work and (not result):
 99        in_work = cs.iteration()
100    
101    if not result:
102        raise LoopWasEndedBeforeSetupWasPrepared
103    
104    result, exception = result.value
105    if exception is not None:
106        raise exception
107
108    return cs, result
109
110
111def prepare_fast_loop(setup_coro_worker: Optional[AnyWorker], *args, **kwargs) -> Tuple[CoroSchedulerType, ValueExistence[Any]]:
112    """30% faster than prepare_loop()
113
114    Args:
115        coro_worker (AnyWorker): _description_
116
117    Raises:
118        exception.value: _description_
119
120    Returns:
121        _type_: _description_
122    """
123    cs = CoroScheduler()
124    set_primary_coro_scheduler(cs)
125    cs.turn_on_embedded_mode(False)
126    cs.set_coro_time_measurement(False)
127    cs.set_coro_history_gathering(False)
128    cs.set_loop_iteration_time_measurement(False)
129    cs.set_global_on_start_handlers(False)
130    cs.suppress_coro_exceptions = True
131    cs.suppress_warnings_about_responses_to_not_existant_coroutines = True
132    cs.use_internal_sleep = True
133
134    async def initial_setup(i: Interface):
135        await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(False))
136
137    initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup)
138
139    async def main(i: Interface, initial_setup_coro: CoroWrapperBase, setup_coro_worker: Optional[AnyWorker], *args, **kwargs):
140        try:
141            await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id))
142        except CoroutineNotFoundError:
143            pass
144
145        if setup_coro_worker is None:
146            return None
147
148        return await i(RunCoro, setup_coro_worker, *args, **kwargs)
149
150    coro: CoroWrapperBase = cs.put_coro(main, initial_setup_coro, setup_coro_worker, *args, **kwargs)
151    result: ValueExistence = ValueExistence()
152
153    def gather(coro: CoroWrapperBase) -> bool:
154        result.value = (coro.last_result, coro.exception)
155        return True
156
157    coro.add_on_coro_del_handler(gather)
158    in_work: bool = True
159    while in_work and (not result):
160        in_work = cs.iteration()
161    
162    if not result:
163        raise LoopWasEndedBeforeSetupWasPrepared
164    
165    result, exception = result.value
166    if exception is not None:
167        raise exception
168
169    return cs, result
def prepare_loop( setup_coro_worker: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, Awaitable[Any]], NoneType], *args, **kwargs) -> Tuple[Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable], cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[Any]]:
 62def prepare_loop(setup_coro_worker: Optional[AnyWorker], *args, **kwargs) -> Tuple[CoroSchedulerType, ValueExistence[Any]]:
 63    cs = CoroScheduler()
 64    set_primary_coro_scheduler(cs)
 65    cs.turn_on_embedded_mode(True)
 66    cs.set_coro_time_measurement(True)
 67    cs.set_coro_history_gathering(True)
 68    cs.set_loop_iteration_time_measurement(True)
 69    cs.set_global_on_start_handlers(True)
 70    cs.suppress_coro_exceptions = True
 71    cs.suppress_warnings_about_responses_to_not_existant_coroutines = True
 72    cs.use_internal_sleep = True
 73
 74    async def initial_setup(i: Interface):
 75        await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(True))
 76
 77    initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup)
 78
 79    async def main(i: Interface, initial_setup_coro: CoroWrapperBase, setup_coro_worker: Optional[AnyWorker], *args, **kwargs):
 80        try:
 81            await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id))
 82        except CoroutineNotFoundError:
 83            pass
 84
 85        if setup_coro_worker is None:
 86            return None
 87        
 88        return await i(RunCoro, setup_coro_worker, *args, **kwargs)
 89
 90    coro: CoroWrapperBase = cs.put_coro(main, initial_setup_coro, setup_coro_worker, *args, **kwargs)
 91    result: ValueExistence = ValueExistence()
 92
 93    def gather(coro: CoroWrapperBase) -> bool:
 94        result.value = (coro.last_result, coro.exception)
 95        return True
 96
 97    coro.add_on_coro_del_handler(gather)
 98    in_work: bool = True
 99    while in_work and (not result):
100        in_work = cs.iteration()
101    
102    if not result:
103        raise LoopWasEndedBeforeSetupWasPrepared
104    
105    result, exception = result.value
106    if exception is not None:
107        raise exception
108
109    return cs, result
def prepare_fast_loop( setup_coro_worker: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, Awaitable[Any]], NoneType], *args, **kwargs) -> Tuple[Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable], cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[Any]]:
112def prepare_fast_loop(setup_coro_worker: Optional[AnyWorker], *args, **kwargs) -> Tuple[CoroSchedulerType, ValueExistence[Any]]:
113    """30% faster than prepare_loop()
114
115    Args:
116        coro_worker (AnyWorker): _description_
117
118    Raises:
119        exception.value: _description_
120
121    Returns:
122        _type_: _description_
123    """
124    cs = CoroScheduler()
125    set_primary_coro_scheduler(cs)
126    cs.turn_on_embedded_mode(False)
127    cs.set_coro_time_measurement(False)
128    cs.set_coro_history_gathering(False)
129    cs.set_loop_iteration_time_measurement(False)
130    cs.set_global_on_start_handlers(False)
131    cs.suppress_coro_exceptions = True
132    cs.suppress_warnings_about_responses_to_not_existant_coroutines = True
133    cs.use_internal_sleep = True
134
135    async def initial_setup(i: Interface):
136        await i(PutCoro, PutCoroRequest().turn_on_tree_monitoring(False))
137
138    initial_setup_coro: CoroWrapperBase = cs.put_coro(initial_setup)
139
140    async def main(i: Interface, initial_setup_coro: CoroWrapperBase, setup_coro_worker: Optional[AnyWorker], *args, **kwargs):
141        try:
142            await i(WaitCoro, WaitCoroRequest().single(initial_setup_coro.coro_id))
143        except CoroutineNotFoundError:
144            pass
145
146        if setup_coro_worker is None:
147            return None
148
149        return await i(RunCoro, setup_coro_worker, *args, **kwargs)
150
151    coro: CoroWrapperBase = cs.put_coro(main, initial_setup_coro, setup_coro_worker, *args, **kwargs)
152    result: ValueExistence = ValueExistence()
153
154    def gather(coro: CoroWrapperBase) -> bool:
155        result.value = (coro.last_result, coro.exception)
156        return True
157
158    coro.add_on_coro_del_handler(gather)
159    in_work: bool = True
160    while in_work and (not result):
161        in_work = cs.iteration()
162    
163    if not result:
164        raise LoopWasEndedBeforeSetupWasPrepared
165    
166    result, exception = result.value
167    if exception is not None:
168        raise exception
169
170    return cs, result

30% faster than prepare_loop()

Args: coro_worker (AnyWorker): _description_

Raises: exception.value: _description_

Returns: _type_: _description_