cengal.parallel_execution.coroutines.coro_tools.wait_coro.versions.v_0.wait_coro

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19__all__ = ['sync_coro', 'sync_coro_param']
 20
 21
 22from cengal.parallel_execution.coroutines.coro_scheduler import *
 23# from cengal.parallel_execution.coroutines.coro_standard_services import *
 24from typing import Union, Optional, Any, List, Tuple, Dict
 25from cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request import *
 26from cengal.code_flow_control.smart_values import ValueExistence
 27from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import put_coro_to
 28from cengal.parallel_execution.coroutines.coro_standard_services.wait_coro import WaitCoro, WaitCoroRequest
 29from cengal.parallel_execution.coroutines.coro_standard_services.put_coro_list import PSCP
 30from cengal.parallel_execution.coroutines.coro_tools.prepare_loop import prepare_loop, prepare_fast_loop
 31
 32
 33"""
 34Module Docstring
 35Docstrings: http://www.python.org/dev/peps/pep-0257/
 36"""
 37
 38__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 39__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 40__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 41__license__ = "Apache License, Version 2.0"
 42__version__ = "4.4.1"
 43__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 44__email__ = "gtalk@butenkoms.space"
 45# __status__ = "Prototype"
 46__status__ = "Development"
 47# __status__ = "Production"
 48
 49
 50class LoopWasEndedBeforeResultWasComputed(Exception):
 51    pass
 52
 53
 54def sync_coro_param(result_required: bool = True, timeout: Optional[float] = None, kill_on_timeout: bool = True, tree: bool = True, cs: Optional[CoroSchedulerType] = None, prepare_own_loop_if_not_found: bool = False, own_loop_shold_be_fast_loop: bool = True, own_loop_setup_coro_worker: Optional[AnyWorker] = None, own_loop_setup_coro_worker_args_kwargs: Optional[Tuple[Tuple, Dict]] = None):
 55    """Decorator. With an arguments. Gives ability to execute any decorated Cengal coroutine from plain sync/async 
 56    code as a sync function. Can postpone execution to the actual loop when possible if None as an immediate result 
 57    (no result) is acceptible. Can start own loop if needed.
 58
 59    Args:
 60        result_required (bool, optional): _description_. Defaults to True.
 61        timeout (Optional[float], optional): _description_. Defaults to None.
 62        kill_on_timeout (bool, optional): _description_. Defaults to True.
 63        tree (bool, optional): _description_. Defaults to True.
 64        cs (Optional[CoroSchedulerType], optional): _description_. Defaults to None.
 65        prepare_own_loop_if_not_found (bool, optional): _description_. Defaults to False.
 66        own_loop_shold_be_fast_loop (bool, optional): _description_. Defaults to True.
 67        own_loop_setup_coro_worker (Optional[AnyWorker], optional): _description_. Defaults to None.
 68        own_loop_setup_coro_worker_args_kwargs (Optional[Tuple[Tuple, Dict]], optional): _description_. Defaults to None.
 69
 70    Raises:
 71        exception: _description_
 72        exception: _description_
 73        LoopWasEndedBeforeResultWasComputed: _description_
 74        TypeError: _description_
 75        TypeError: _description_
 76
 77    Returns:
 78        _type_: _description_
 79    """    
 80    cs0 = cs
 81    def sync_coro_decorator(coro_worker: Worker):
 82        cs1 = cs0
 83        def wrapper(*args, **kwargs):
 84            cs = cs1
 85            done: ValueExistence = ValueExistence()
 86            async def coro(i: Interface, done: ValueExistence, coro_worker: Worker, *args, **kwargs):
 87                results: List[Tuple[CoroID, Any, Union[None, Exception]]] = await i(WaitCoro, WaitCoroRequest(timeout, kill_on_timeout, tree).put_list([PSCP(coro_worker, *args, **kwargs)]))
 88                done.value = results[0]
 89            
 90            direct_execution_required: bool = False
 91            if cs is None:
 92                cs = get_available_coro_scheduler()
 93            
 94            if (cs is None) and prepare_own_loop_if_not_found:
 95                if own_loop_setup_coro_worker_args_kwargs is None:
 96                    args: Tuple = tuple()
 97                    kwargs: Dict = dict()
 98                else:
 99                    args, kwargs = own_loop_setup_coro_worker_args_kwargs
100
101                if own_loop_shold_be_fast_loop:
102                    cs, _ = prepare_fast_loop(own_loop_setup_coro_worker, *args, **kwargs)
103                else:
104                    cs, _ = prepare_loop(own_loop_setup_coro_worker, *args, **kwargs)
105
106            if cs is None:
107                direct_execution_required = True
108            else:
109                i: Optional[Interface] = None
110                inside_loop: bool = False
111                inside_coro: bool = False
112                try:
113                    i = current_interface()
114                    inside_loop = True
115                    if i is not None:
116                        inside_coro = True
117                except OutsideCoroSchedulerContext:
118                    pass
119
120                if inside_loop:
121                    if inside_coro:
122                        inside_greenlet_coro: bool = False
123                        if isinstance(i, InterfaceGreenlet):
124                            inside_greenlet_coro = True
125                        
126                        if inside_greenlet_coro:
127                            results: List[Tuple[CoroID, Any, Union[None, Exception]]] = i(WaitCoro, WaitCoroRequest(timeout, kill_on_timeout, tree).put_list([PSCP(coro_worker, *args, **kwargs)]))
128                            _, result, exception = results[0]
129                            if exception is not None:
130                                raise exception
131                            
132                            return result
133                        else:
134                            pass
135                    else:
136                        pass
137                else:
138                    put_coro_to(get_interface_and_loop_with_explicit_loop(cs), coro, done, coro_worker, *args, **kwargs)
139                    in_work = True
140                    while in_work and (not done):
141                        in_work = cs.iteration()
142                    
143                    if done:
144                        _, result, exception = done.value
145                        if exception is not None:
146                            raise exception
147                    else:
148                        raise LoopWasEndedBeforeResultWasComputed
149
150            if result_required:
151                direct_execution_required = True
152            else:
153                put_coro_to(interface_and_loop_with_explicit_loop(cs), coro_worker, *args, **kwargs)
154
155            if direct_execution_required:
156                coro_worker_type: CoroType = find_coro_type(coro_worker)
157                if CoroType.greenlet == coro_worker_type:
158                    return coro_worker(InterfaceFake(None, None), *args, **kwargs)
159                elif CoroType.awaitable == coro_worker_type:
160                    raise TypeError(f'Can not directly execute awaitable {coro_worker} from non async environment')
161                else:
162                    raise TypeError(f'{coro_worker} is neither an awaitable nor a greenlet')
163        
164        return wrapper
165    return sync_coro_decorator
166
167
168def sync_coro(coro_worker: Worker):
169    return sync_coro_param()(coro_worker)
def sync_coro( coro_worker: typing.Union[collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]]):
169def sync_coro(coro_worker: Worker):
170    return sync_coro_param()(coro_worker)
def sync_coro_param( result_required: bool = True, timeout: typing.Union[float, NoneType] = None, kill_on_timeout: bool = True, tree: bool = True, cs: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType] = None, prepare_own_loop_if_not_found: bool = False, own_loop_shold_be_fast_loop: bool = True, own_loop_setup_coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]], NoneType] = None, own_loop_setup_coro_worker_args_kwargs: typing.Union[typing.Tuple[typing.Tuple, typing.Dict], NoneType] = None):
 55def sync_coro_param(result_required: bool = True, timeout: Optional[float] = None, kill_on_timeout: bool = True, tree: bool = True, cs: Optional[CoroSchedulerType] = None, prepare_own_loop_if_not_found: bool = False, own_loop_shold_be_fast_loop: bool = True, own_loop_setup_coro_worker: Optional[AnyWorker] = None, own_loop_setup_coro_worker_args_kwargs: Optional[Tuple[Tuple, Dict]] = None):
 56    """Decorator. With an arguments. Gives ability to execute any decorated Cengal coroutine from plain sync/async 
 57    code as a sync function. Can postpone execution to the actual loop when possible if None as an immediate result 
 58    (no result) is acceptible. Can start own loop if needed.
 59
 60    Args:
 61        result_required (bool, optional): _description_. Defaults to True.
 62        timeout (Optional[float], optional): _description_. Defaults to None.
 63        kill_on_timeout (bool, optional): _description_. Defaults to True.
 64        tree (bool, optional): _description_. Defaults to True.
 65        cs (Optional[CoroSchedulerType], optional): _description_. Defaults to None.
 66        prepare_own_loop_if_not_found (bool, optional): _description_. Defaults to False.
 67        own_loop_shold_be_fast_loop (bool, optional): _description_. Defaults to True.
 68        own_loop_setup_coro_worker (Optional[AnyWorker], optional): _description_. Defaults to None.
 69        own_loop_setup_coro_worker_args_kwargs (Optional[Tuple[Tuple, Dict]], optional): _description_. Defaults to None.
 70
 71    Raises:
 72        exception: _description_
 73        exception: _description_
 74        LoopWasEndedBeforeResultWasComputed: _description_
 75        TypeError: _description_
 76        TypeError: _description_
 77
 78    Returns:
 79        _type_: _description_
 80    """    
 81    cs0 = cs
 82    def sync_coro_decorator(coro_worker: Worker):
 83        cs1 = cs0
 84        def wrapper(*args, **kwargs):
 85            cs = cs1
 86            done: ValueExistence = ValueExistence()
 87            async def coro(i: Interface, done: ValueExistence, coro_worker: Worker, *args, **kwargs):
 88                results: List[Tuple[CoroID, Any, Union[None, Exception]]] = await i(WaitCoro, WaitCoroRequest(timeout, kill_on_timeout, tree).put_list([PSCP(coro_worker, *args, **kwargs)]))
 89                done.value = results[0]
 90            
 91            direct_execution_required: bool = False
 92            if cs is None:
 93                cs = get_available_coro_scheduler()
 94            
 95            if (cs is None) and prepare_own_loop_if_not_found:
 96                if own_loop_setup_coro_worker_args_kwargs is None:
 97                    args: Tuple = tuple()
 98                    kwargs: Dict = dict()
 99                else:
100                    args, kwargs = own_loop_setup_coro_worker_args_kwargs
101
102                if own_loop_shold_be_fast_loop:
103                    cs, _ = prepare_fast_loop(own_loop_setup_coro_worker, *args, **kwargs)
104                else:
105                    cs, _ = prepare_loop(own_loop_setup_coro_worker, *args, **kwargs)
106
107            if cs is None:
108                direct_execution_required = True
109            else:
110                i: Optional[Interface] = None
111                inside_loop: bool = False
112                inside_coro: bool = False
113                try:
114                    i = current_interface()
115                    inside_loop = True
116                    if i is not None:
117                        inside_coro = True
118                except OutsideCoroSchedulerContext:
119                    pass
120
121                if inside_loop:
122                    if inside_coro:
123                        inside_greenlet_coro: bool = False
124                        if isinstance(i, InterfaceGreenlet):
125                            inside_greenlet_coro = True
126                        
127                        if inside_greenlet_coro:
128                            results: List[Tuple[CoroID, Any, Union[None, Exception]]] = i(WaitCoro, WaitCoroRequest(timeout, kill_on_timeout, tree).put_list([PSCP(coro_worker, *args, **kwargs)]))
129                            _, result, exception = results[0]
130                            if exception is not None:
131                                raise exception
132                            
133                            return result
134                        else:
135                            pass
136                    else:
137                        pass
138                else:
139                    put_coro_to(get_interface_and_loop_with_explicit_loop(cs), coro, done, coro_worker, *args, **kwargs)
140                    in_work = True
141                    while in_work and (not done):
142                        in_work = cs.iteration()
143                    
144                    if done:
145                        _, result, exception = done.value
146                        if exception is not None:
147                            raise exception
148                    else:
149                        raise LoopWasEndedBeforeResultWasComputed
150
151            if result_required:
152                direct_execution_required = True
153            else:
154                put_coro_to(interface_and_loop_with_explicit_loop(cs), coro_worker, *args, **kwargs)
155
156            if direct_execution_required:
157                coro_worker_type: CoroType = find_coro_type(coro_worker)
158                if CoroType.greenlet == coro_worker_type:
159                    return coro_worker(InterfaceFake(None, None), *args, **kwargs)
160                elif CoroType.awaitable == coro_worker_type:
161                    raise TypeError(f'Can not directly execute awaitable {coro_worker} from non async environment')
162                else:
163                    raise TypeError(f'{coro_worker} is neither an awaitable nor a greenlet')
164        
165        return wrapper
166    return sync_coro_decorator

Decorator. With an arguments. Gives ability to execute any decorated Cengal coroutine from plain sync/async code as a sync function. Can postpone execution to the actual loop when possible if None as an immediate result (no result) is acceptible. Can start own loop if needed.

Args: result_required (bool, optional): _description_. Defaults to True. timeout (Optional[float], optional): _description_. Defaults to None. kill_on_timeout (bool, optional): _description_. Defaults to True. tree (bool, optional): _description_. Defaults to True. cs (Optional[CoroSchedulerType], optional): _description_. Defaults to None. prepare_own_loop_if_not_found (bool, optional): _description_. Defaults to False. own_loop_shold_be_fast_loop (bool, optional): _description_. Defaults to True. own_loop_setup_coro_worker (Optional[AnyWorker], optional): _description_. Defaults to None. own_loop_setup_coro_worker_args_kwargs (Optional[Tuple[Tuple, Dict]], optional): _description_. Defaults to None.

Raises: exception: _description_ exception: _description_ LoopWasEndedBeforeResultWasComputed: _description_ TypeError: _description_ TypeError: _description_

Returns: _type_: _description_