cengal.parallel_execution.coroutines.coro_tools.await_coro.versions.v_0.await_coro

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18__all__ = ['await_coro_fast', 'await_coro', 'await_coro_prim', 'asyncio_coro', 'await_task_fast', 'await_task', 'await_task_prim', 
 19           'cs_awaitable', 
 20           'RunSchedulerInAsyncioLoop',
 21           'coro_interfaces_arg_manager', 'create_task', 'create_task_in_thread_pool', 'task_in_thread_pool']
 22
 23import sys
 24import asyncio
 25from inspect import signature, Signature, Parameter, isawaitable
 26from cengal.parallel_execution.coroutines.coro_scheduler import *
 27# from cengal.parallel_execution.coroutines.coro_standard_services import *
 28from cengal.code_flow_control.args_manager import ArgsManager, EArgs
 29from enum import Enum
 30from functools import partial
 31from typing import Union, Optional, Callable, Awaitable, Any, Coroutine
 32from cengal.time_management.sleep_tools import get_min_sleep_interval, try_sleep, get_usable_min_sleep_interval, get_countable_delta_time
 33from cengal.introspection.inspect import get_exception, get_exception_tripple, is_async
 34from cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request import *
 35from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro
 36
 37from functools import wraps, update_wrapper
 38
 39
 40"""
 41Module Docstring
 42Docstrings: http://www.python.org/dev/peps/pep-0257/
 43"""
 44
 45__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 46__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 47__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 48__license__ = "Apache License, Version 2.0"
 49__version__ = "4.4.1"
 50__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 51__email__ = "gtalk@butenkoms.space"
 52# __status__ = "Prototype"
 53__status__ = "Development"
 54# __status__ = "Production"
 55
 56
 57def _async_coro_wrapper(interface: Interface, future: asyncio.Future, coro_worker: Worker, *args, **kwargs):
 58    coro_worker_result = None
 59    try:
 60        if __debug__: dlog(f'λ wrapper => {func_info(coro_worker)}')
 61        coro_worker_result = coro_worker(interface, *args, **kwargs)
 62    except:
 63        if __debug__: dlog(f'λ wrapper (Exception) <= {repr(coro_worker)}')
 64        if not future.cancelled():
 65            ex_type, ex_value, ex_traceback = get_exception_tripple()
 66            if __debug__: dlog(ex_type, ex_value, ex_traceback)
 67            ex_value = ex_value.with_traceback(ex_traceback)
 68            future.set_exception(ex_value)
 69    else:
 70        if __debug__: dlog(f'λ wrapper <= {repr(coro_worker)}')
 71        if not future.cancelled():
 72            future.set_result(coro_worker_result)
 73
 74
 75async def _awaitable_async_coro_wrapper(interface: Interface, future: asyncio.Future, coro_worker: Worker, *args, **kwargs):
 76    coro_worker_result = None
 77    try:
 78        if __debug__: dlog(f'aλ wrapper => {func_info(coro_worker)}')
 79        
 80        # # TODO: fix required!
 81        # coro_worker_result = await interface(RunCoro, CoroType.awaitable, coro_worker, *args, **kwargs)
 82        # TODO: possible fix:
 83        coro_worker_result = await coro_worker(interface, *args, **kwargs)
 84    except:
 85        if __debug__: dlog(f'aλ wrapper (Exception) <= {repr(coro_worker)}')
 86        if not future.cancelled():
 87            ex_type, ex_value, ex_traceback = get_exception_tripple()
 88            if __debug__: dlog(ex_type, ex_value, ex_traceback)
 89            ex_value = ex_value.with_traceback(ex_traceback)
 90            future.set_exception(ex_value)
 91    else:
 92        if __debug__: dlog(f'aλ wrapper <= {repr(coro_worker)}')
 93        if not future.cancelled():
 94            future.set_result(coro_worker_result)
 95
 96
 97def await_coro_fast(loop: asyncio.AbstractEventLoop,
 98                    scheduler: CoroSchedulerType, coro_type: Optional[CoroType], coro_worker: Worker, *args, **kwargs
 99                    ) -> asyncio.Future:
100    coro_type = coro_type or CoroType.auto
101    if CoroType.auto == coro_type:
102        coro_type = find_coro_type(coro_worker)
103    
104    future = loop.create_future()
105    if CoroType.awaitable == coro_type:
106        put_request_to_service_with_context(get_interface_and_loop_with_explicit_loop(scheduler), PutCoro, ExplicitWorker(coro_type, _awaitable_async_coro_wrapper), future, coro_worker, *args, **kwargs)
107    elif CoroType.greenlet == coro_type:
108        put_request_to_service_with_context(get_interface_and_loop_with_explicit_loop(scheduler), PutCoro, ExplicitWorker(coro_type, _async_coro_wrapper), future, coro_worker, *args, **kwargs)
109    else:
110        raise NotImplementedError
111    
112    return future
113
114
115def await_coro(scheduler: CoroSchedulerType, coro_worker: Worker, *args, **kwargs) -> asyncio.Future:
116    return await_coro_fast(asyncio.get_event_loop(), scheduler, None, coro_worker, *args, **kwargs)
117
118
119def await_coro_prim(coro_worker: Worker, *args, **kwargs) -> asyncio.Future:
120    """Tries to use primary coro scheduler (must be set before usage)
121
122    Args:
123        coro_worker (Worker): _description_
124
125    Returns:
126        asyncio.Future: _description_
127    """
128    return await_coro_fast(asyncio.get_event_loop(), available_coro_scheduler(), CoroType.auto, coro_worker, *args, **kwargs)
129
130
131def asyncio_coro(coro_worker: Worker) -> Coroutine:
132    """Decorator. Without arguments. Gives an ability to await any decorated Cengal coroutine from the async code
133
134    Args:
135        coro_worker (Worker): _description_
136
137    Returns:
138        Coroutine: _description_
139    """    
140    async def wrapper(*args, **kwargs):
141        return await await_coro_prim(coro_worker, *args, **kwargs)
142    
143    coro_worker_sign: Signature = signature(coro_worker)
144    wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation)
145    return wrapper
146
147
148def _async_task_runner_coro_worker(interface: Interface, service_type: ServiceType, *args, **kwargs):
149    if __debug__: dlog(f'λ wrapper <_async_task_runner_coro_worker> => interface({service_type}, {args}, {kwargs})')
150    result = interface(service_type, *args, **kwargs)
151    if __debug__: dlog(f'λ wrapper <_async_task_runner_coro_worker> <= interface({service_type}, {args}, {kwargs}): {result}')
152    return result
153
154
155def await_task_fast(loop: asyncio.AbstractEventLoop,
156                    scheduler: CoroSchedulerType, service_type: ServiceType, *args, **kwargs
157                    ) -> asyncio.Future:
158    future = loop.create_future()
159    put_request_to_service_with_context(get_interface_and_loop_with_explicit_loop(scheduler), PutCoro, ExplicitWorker(CoroType.greenlet, _async_coro_wrapper), future, _async_task_runner_coro_worker, service_type, *args, **kwargs)
160    return future
161
162
163def await_task(scheduler: CoroSchedulerType, service_type: ServiceType, *args, **kwargs) -> asyncio.Future:
164    return await_task_fast(asyncio.get_event_loop(), scheduler, service_type, *args, **kwargs)
165
166
167def await_task_prim(service_type: ServiceType, *args, **kwargs) -> asyncio.Future:
168    """Tries to use primary coro scheduler (must be set before usage)
169
170    Args:
171        service_type (ServiceType): _description_
172
173    Returns:
174        asyncio.Future: _description_
175    """
176    return await_task_fast(asyncio.get_event_loop(), available_coro_scheduler(), service_type, *args, **kwargs)
177
178
179def cs_awaitable(coro_worker: Callable) -> Callable:
180    """Decorator. Without arguments. Makes any Cengal coro awaitable from the async code (like coroutines in asyncio)
181    Example:
182        from cengal.parallel_execution.coroutines.coro_scheduler import cs_acoro
183        from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import RunCoro
184        from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro
185        from cengal.parallel_execution.coroutines.coro_standard_services.sleep import Sleep
186        import asyncio
187
188        @cs_awaitable
189        def my_coro_g(a: str, b: str) -> str:
190            i: Interface = current_interface()
191            i(Sleep, 0.1)
192
193            return a + b
194
195        @cs_awaitable
196        async def my_coro_a(a: str, b: str) -> str:
197            i: Interface = current_interface()
198            await i(Sleep, 0.05)
199
200            await asyncio.sleep(0.05)
201
202            return a + b
203
204        @cs_acoro
205        async def my_coro_a_implicit(a: str, b: str) -> str:
206            i: Interface = current_interface()
207            await i(Sleep, 0.05)
208
209            await asyncio.sleep(0.05)
210
211            return a + b
212
213        async def my_coro_a_explicit(i: Interface, a: str, b: str) -> str:
214            await i(Sleep, 0.05)
215
216            await asyncio.sleep(0.05)
217
218            return a + b
219        
220        async def my_coro_asyncio(a: str, b: str) -> str:
221            await asyncio.sleep(0.1)
222
223            return a + b
224        
225        @cs_awaitable
226        async def amain():
227            i: Interface = current_interface()
228
229            # await
230
231            print(await my_coro_g('a', 'b'))
232            print(await my_coro_a('a', 'b'))
233            print(await my_coro_a_implicit('a', 'b'))
234            print(await my_coro_a_explicit(i, 'a', 'b'))
235            print(await my_coro_asyncio('a', 'b'))
236
237            # RunCoro
238
239            print(await i(RunCoro, my_coro_g('a', 'b')))
240            print(await i(RunCoro, my_coro_g, 'a', 'b'))
241
242            print(await i(RunCoro, my_coro_a('a', 'b')))
243            print(await i(RunCoro, my_coro_a, 'a', 'b'))
244            
245            print(await i(RunCoro, my_coro_a_implicit('a', 'b')))
246            print(await i(RunCoro, my_coro_a_implicit, 'a', 'b'))
247            
248            print(await i(RunCoro, my_coro_a_explicit, 'a', 'b'))
249            
250            print(await i(RunCoro, my_coro_asyncio('a', 'b')))
251            print(await i(RunCoro, cs_acoro(my_coro_asyncio), 'a', 'b'))
252
253            # PutCoro
254
255            await i(PutCoro, my_coro_g('a', 'b'))
256            await i(PutCoro, my_coro_g, 'a', 'b')
257
258            await i(PutCoro, my_coro_a('a', 'b'))
259            await i(PutCoro, my_coro_a, 'a', 'b')
260            
261            await i(PutCoro, my_coro_a_implicit('a', 'b'))
262            await i(PutCoro, my_coro_a_implicit, 'a', 'b')
263            
264            await i(PutCoro, my_coro_a_explicit, 'a', 'b')
265            
266            await i(PutCoro, my_coro_asyncio('a', 'b'))
267            await i(PutCoro, cs_acoro(my_coro_asyncio), 'a', 'b')
268        
269        @cs_awaitable
270        def main():
271            i: Interface = current_interface()
272
273            # RunCoro
274
275            print(i(RunCoro, my_coro_g('a', 'b')))
276            print(i(RunCoro, my_coro_g, 'a', 'b'))
277
278            print(i(RunCoro, my_coro_a('a', 'b')))
279            print(i(RunCoro, my_coro_a, 'a', 'b'))
280            
281            print(i(RunCoro, my_coro_a_implicit('a', 'b')))
282            print(i(RunCoro, my_coro_a_implicit, 'a', 'b'))
283            
284            print(i(RunCoro, my_coro_a_explicit, 'a', 'b'))
285            
286            print(i(RunCoro, my_coro_asyncio('a', 'b')))
287            print(i(RunCoro, cs_acoro(my_coro_asyncio), 'a', 'b'))
288
289            # PutCoro
290
291            i(PutCoro, my_coro_g('a', 'b'))
292            i(PutCoro, my_coro_g, 'a', 'b')
293
294            i(PutCoro, my_coro_a('a', 'b'))
295            i(PutCoro, my_coro_a, 'a', 'b')
296            
297            i(PutCoro, my_coro_a_implicit('a', 'b'))
298            i(PutCoro, my_coro_a_implicit, 'a', 'b')
299            
300            i(PutCoro, my_coro_a_explicit, 'a', 'b')
301            
302            i(PutCoro, my_coro_asyncio('a', 'b'))
303            i(PutCoro, cs_acoro(my_coro_asyncio), 'a', 'b')
304
305    Args:
306        coro_worker (Worker): _description_
307
308    Returns:
309        Coroutine: _description_
310    """
311    if is_async(coro_worker):
312        @wraps(coro_worker)
313        async def wrapper(*args, **kwargs) -> Any:
314            if isawaitable(coro_worker):
315                return await coro_worker
316            else:
317                return await coro_worker(*args, **kwargs)
318        
319        wrapper: coro_worker
320        return wrapper
321    else:
322        @wraps(coro_worker)
323        async def wrapper(*args, **kwargs) -> Any:
324            from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import RunCoro
325            i: Interface = current_interface()
326            return await i(RunCoro, cs_coro(coro_worker), *args, **kwargs)
327        
328        wrapper: coro_worker
329        return wrapper
330
331
332def coro_interfaces_arg_manager(event_loop, coro_scheduler: CoroSchedulerType):
333    acf = ArgsManager(
334        EArgs(event_loop, coro_scheduler, CoroType.auto),
335    ).callable(await_coro_fast)
336    atf = ArgsManager(
337        EArgs(event_loop, coro_scheduler),
338    ).callable(await_task_fast)
339    
340    return EArgs(await_coro_fast=acf, await_task_fast=atf)
341
342
343class RunSchedulerInAsyncioLoop:
344    def __init__(self, scheduler: CoroSchedulerType, idle_time: Optional[Union[int, float]]=None, loop: Optional[asyncio.AbstractEventLoop]=None, execute_every_X_iterations: int = 1):
345        self.scheduler = scheduler
346        self.scheduler.on_woke_up_callback = self.on_woke_up_callback
347        if idle_time is None:
348            idle_time = get_usable_min_sleep_interval()
349        
350        self.idle_time = get_min_sleep_interval() * (idle_time // get_min_sleep_interval())
351        self.min_sleep_interval = max(get_min_sleep_interval(), self.idle_time or 0)
352        self.usable_idle_time = (get_min_sleep_interval() * (idle_time // get_min_sleep_interval())) + get_countable_delta_time()
353        self.loop = loop or asyncio.get_event_loop()
354        self.handle = None  # type: Optional[asyncio.Handle]
355        self.make_idle_when_possible = False
356        self.need_to_stop_when_possible = False
357        self.need_to_stop_now = False
358        self.in_idle_state = False
359        if execute_every_X_iterations < 1:
360            execute_every_X_iterations = 1
361        
362        self.execute_every_X_iterations = execute_every_X_iterations
363        self.current_iteration = execute_every_X_iterations
364    
365    def on_woke_up_callback(self):
366        if self.in_idle_state:
367            self.cancel_handle()
368            self.in_idle_state = False
369            self.register()
370
371    def __call__(self, *args, **kwargs):
372        self.current_iteration -= 1
373        if self.current_iteration:
374            self.register()
375            return
376        else:
377            self.current_iteration = self.execute_every_X_iterations
378        
379        self.handle = None
380        self.in_idle_state = False
381        if self.need_to_stop_now:
382            self.need_to_stop_now = False
383            self.need_to_stop_when_possible = False
384            return
385
386        in_work = self.scheduler.iteration()
387        
388        if (not in_work) and self.need_to_stop_when_possible:
389            self.need_to_stop_now = False
390            self.need_to_stop_when_possible = False
391            return
392
393        idle_time = self.scheduler.next_event_after()
394        # is_awake = self.scheduler.is_awake()
395        is_idle = self.scheduler.is_idle()
396        need_to_register = False
397        if self.make_idle_when_possible:
398            if not is_idle:
399                need_to_register = True
400        else:
401            need_to_register = True
402
403        if need_to_register:
404            self.register()
405        else:
406            self.register_idle(idle_time)
407
408    def ready_to_stop(self) -> bool:
409        return not self.scheduler.in_work()
410
411    def register(self):
412        # self.register_idle()
413        self.handle = self.loop.call_soon(self)
414
415    def register_idle(self, idle_time: Optional[float] = None):
416        if idle_time is None:
417            idle_time = self.usable_idle_time
418        
419        # self.handle = self.loop.call_later(self.idle_time, self)
420        if idle_time < self.min_sleep_interval:
421            self.register()
422        else:
423            # self.handle = try_sleep(self.idle_time * (idle_time // self.idle_time), self.loop.call_later, self)
424            # # self.handle = try_sleep(0.001, self.loop.call_later, self)
425            self.in_idle_state = True
426            idle_time = (self.idle_time * (idle_time // self.idle_time)) + get_countable_delta_time()
427            self.handle = self.loop.call_later(idle_time, self)
428
429    def cancel_handle(self):
430        if self.handle and (not self.handle.cancelled()):
431            self.handle.cancel()
432            self.handle = None
433
434    def stop(self):
435        if self.handle and (not self.handle.cancelled()):
436            self.need_to_stop_now = True
437            self.handle.cancel()
438            self.handle = None
439
440    def stop_when_possible(self):
441        self.need_to_stop_when_possible = True
442
443
444# def call_soon_future(loop: Optional[asyncio.AbstractEventLoop], awaitable_coro_obj: Awaitable):
445#     if __debug__: dlog('call_soon_future - start')
446#     loop = loop or asyncio.get_event_loop()
447#
448#     def handler():
449#         if __debug__: dlog('call_soon_future.handler - start')
450#         loop.create_task(awaitable_coro_obj)
451#         if __debug__: dlog('call_soon_future.handler - end')
452#
453#     loop.get_event_loop().call_soon(handler)
454#     if __debug__: dlog('call_soon_future - end')
455
456
457def call_soon_future(loop: Optional[asyncio.AbstractEventLoop], awaitable_coro_obj: Awaitable):
458    if __debug__: dlog('call_soon_future - start')
459    loop = loop or asyncio.get_event_loop()
460
461    def handler():
462        if __debug__: dlog('call_soon_future.handler - start')
463        loop.create_task(awaitable_coro_obj)
464        if __debug__: dlog('call_soon_future.handler - end')
465
466    loop.get_event_loop().call_soon(handler)
467    if __debug__: dlog('call_soon_future - end')
468
469
470def create_task(loop: Optional[asyncio.AbstractEventLoop], acoro: Callable, *args, **kwargs) -> asyncio.Task:
471    if __debug__: dlog('call_soon_future - start')
472    loop = loop or asyncio.get_event_loop()
473    awaitable_coro_obj = acoro(*args, **kwargs)
474    return loop.create_task(awaitable_coro_obj)
475
476
477def create_task_in_thread_pool(loop: Optional[asyncio.AbstractEventLoop], joiner: Optional[Awaitable[Any]], worker: Callable, *args, **kwargs):
478    async def sync_handler(loop, joiner, worker, args, kwargs):
479        worker_with_args = partial(worker, *args, **kwargs)
480        result = await loop.run_in_executor(None, worker_with_args)
481        if joiner is not None:
482            await joiner(result)
483                        
484    create_task(loop, sync_handler, loop, joiner, worker, args, kwargs)
485
486
487async def task_in_thread_pool(loop: Optional[asyncio.AbstractEventLoop], worker: Callable, *args, **kwargs):
488    worker_with_args = partial(worker, *args, **kwargs)
489    return await loop.run_in_executor(None, worker_with_args)
def await_coro_fast( loop: asyncio.events.AbstractEventLoop, scheduler: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable], coro_type: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroType, NoneType], coro_worker: typing.Union[collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> _asyncio.Future:
 98def await_coro_fast(loop: asyncio.AbstractEventLoop,
 99                    scheduler: CoroSchedulerType, coro_type: Optional[CoroType], coro_worker: Worker, *args, **kwargs
100                    ) -> asyncio.Future:
101    coro_type = coro_type or CoroType.auto
102    if CoroType.auto == coro_type:
103        coro_type = find_coro_type(coro_worker)
104    
105    future = loop.create_future()
106    if CoroType.awaitable == coro_type:
107        put_request_to_service_with_context(get_interface_and_loop_with_explicit_loop(scheduler), PutCoro, ExplicitWorker(coro_type, _awaitable_async_coro_wrapper), future, coro_worker, *args, **kwargs)
108    elif CoroType.greenlet == coro_type:
109        put_request_to_service_with_context(get_interface_and_loop_with_explicit_loop(scheduler), PutCoro, ExplicitWorker(coro_type, _async_coro_wrapper), future, coro_worker, *args, **kwargs)
110    else:
111        raise NotImplementedError
112    
113    return future
def await_coro( scheduler: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable], coro_worker: typing.Union[collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> _asyncio.Future:
116def await_coro(scheduler: CoroSchedulerType, coro_worker: Worker, *args, **kwargs) -> asyncio.Future:
117    return await_coro_fast(asyncio.get_event_loop(), scheduler, None, coro_worker, *args, **kwargs)
def await_coro_prim( coro_worker: typing.Union[collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> _asyncio.Future:
120def await_coro_prim(coro_worker: Worker, *args, **kwargs) -> asyncio.Future:
121    """Tries to use primary coro scheduler (must be set before usage)
122
123    Args:
124        coro_worker (Worker): _description_
125
126    Returns:
127        asyncio.Future: _description_
128    """
129    return await_coro_fast(asyncio.get_event_loop(), available_coro_scheduler(), CoroType.auto, coro_worker, *args, **kwargs)

Tries to use primary coro scheduler (must be set before usage)

Args: coro_worker (Worker): _description_

Returns: asyncio.Future: _description_

def asyncio_coro( coro_worker: typing.Union[collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]]) -> Coroutine:
132def asyncio_coro(coro_worker: Worker) -> Coroutine:
133    """Decorator. Without arguments. Gives an ability to await any decorated Cengal coroutine from the async code
134
135    Args:
136        coro_worker (Worker): _description_
137
138    Returns:
139        Coroutine: _description_
140    """    
141    async def wrapper(*args, **kwargs):
142        return await await_coro_prim(coro_worker, *args, **kwargs)
143    
144    coro_worker_sign: Signature = signature(coro_worker)
145    wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation)
146    return wrapper

Decorator. Without arguments. Gives an ability to await any decorated Cengal coroutine from the async code

Args: coro_worker (Worker): _description_

Returns: Coroutine: _description_

def await_task_fast( loop: asyncio.events.AbstractEventLoop, scheduler: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable], service_type: type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], *args, **kwargs) -> _asyncio.Future:
156def await_task_fast(loop: asyncio.AbstractEventLoop,
157                    scheduler: CoroSchedulerType, service_type: ServiceType, *args, **kwargs
158                    ) -> asyncio.Future:
159    future = loop.create_future()
160    put_request_to_service_with_context(get_interface_and_loop_with_explicit_loop(scheduler), PutCoro, ExplicitWorker(CoroType.greenlet, _async_coro_wrapper), future, _async_task_runner_coro_worker, service_type, *args, **kwargs)
161    return future
def await_task( scheduler: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable], service_type: type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], *args, **kwargs) -> _asyncio.Future:
164def await_task(scheduler: CoroSchedulerType, service_type: ServiceType, *args, **kwargs) -> asyncio.Future:
165    return await_task_fast(asyncio.get_event_loop(), scheduler, service_type, *args, **kwargs)
def await_task_prim( service_type: type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], *args, **kwargs) -> _asyncio.Future:
168def await_task_prim(service_type: ServiceType, *args, **kwargs) -> asyncio.Future:
169    """Tries to use primary coro scheduler (must be set before usage)
170
171    Args:
172        service_type (ServiceType): _description_
173
174    Returns:
175        asyncio.Future: _description_
176    """
177    return await_task_fast(asyncio.get_event_loop(), available_coro_scheduler(), service_type, *args, **kwargs)

Tries to use primary coro scheduler (must be set before usage)

Args: service_type (ServiceType): _description_

Returns: asyncio.Future: _description_

def cs_awaitable(coro_worker: typing.Callable) -> Callable:
180def cs_awaitable(coro_worker: Callable) -> Callable:
181    """Decorator. Without arguments. Makes any Cengal coro awaitable from the async code (like coroutines in asyncio)
182    Example:
183        from cengal.parallel_execution.coroutines.coro_scheduler import cs_acoro
184        from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import RunCoro
185        from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro
186        from cengal.parallel_execution.coroutines.coro_standard_services.sleep import Sleep
187        import asyncio
188
189        @cs_awaitable
190        def my_coro_g(a: str, b: str) -> str:
191            i: Interface = current_interface()
192            i(Sleep, 0.1)
193
194            return a + b
195
196        @cs_awaitable
197        async def my_coro_a(a: str, b: str) -> str:
198            i: Interface = current_interface()
199            await i(Sleep, 0.05)
200
201            await asyncio.sleep(0.05)
202
203            return a + b
204
205        @cs_acoro
206        async def my_coro_a_implicit(a: str, b: str) -> str:
207            i: Interface = current_interface()
208            await i(Sleep, 0.05)
209
210            await asyncio.sleep(0.05)
211
212            return a + b
213
214        async def my_coro_a_explicit(i: Interface, a: str, b: str) -> str:
215            await i(Sleep, 0.05)
216
217            await asyncio.sleep(0.05)
218
219            return a + b
220        
221        async def my_coro_asyncio(a: str, b: str) -> str:
222            await asyncio.sleep(0.1)
223
224            return a + b
225        
226        @cs_awaitable
227        async def amain():
228            i: Interface = current_interface()
229
230            # await
231
232            print(await my_coro_g('a', 'b'))
233            print(await my_coro_a('a', 'b'))
234            print(await my_coro_a_implicit('a', 'b'))
235            print(await my_coro_a_explicit(i, 'a', 'b'))
236            print(await my_coro_asyncio('a', 'b'))
237
238            # RunCoro
239
240            print(await i(RunCoro, my_coro_g('a', 'b')))
241            print(await i(RunCoro, my_coro_g, 'a', 'b'))
242
243            print(await i(RunCoro, my_coro_a('a', 'b')))
244            print(await i(RunCoro, my_coro_a, 'a', 'b'))
245            
246            print(await i(RunCoro, my_coro_a_implicit('a', 'b')))
247            print(await i(RunCoro, my_coro_a_implicit, 'a', 'b'))
248            
249            print(await i(RunCoro, my_coro_a_explicit, 'a', 'b'))
250            
251            print(await i(RunCoro, my_coro_asyncio('a', 'b')))
252            print(await i(RunCoro, cs_acoro(my_coro_asyncio), 'a', 'b'))
253
254            # PutCoro
255
256            await i(PutCoro, my_coro_g('a', 'b'))
257            await i(PutCoro, my_coro_g, 'a', 'b')
258
259            await i(PutCoro, my_coro_a('a', 'b'))
260            await i(PutCoro, my_coro_a, 'a', 'b')
261            
262            await i(PutCoro, my_coro_a_implicit('a', 'b'))
263            await i(PutCoro, my_coro_a_implicit, 'a', 'b')
264            
265            await i(PutCoro, my_coro_a_explicit, 'a', 'b')
266            
267            await i(PutCoro, my_coro_asyncio('a', 'b'))
268            await i(PutCoro, cs_acoro(my_coro_asyncio), 'a', 'b')
269        
270        @cs_awaitable
271        def main():
272            i: Interface = current_interface()
273
274            # RunCoro
275
276            print(i(RunCoro, my_coro_g('a', 'b')))
277            print(i(RunCoro, my_coro_g, 'a', 'b'))
278
279            print(i(RunCoro, my_coro_a('a', 'b')))
280            print(i(RunCoro, my_coro_a, 'a', 'b'))
281            
282            print(i(RunCoro, my_coro_a_implicit('a', 'b')))
283            print(i(RunCoro, my_coro_a_implicit, 'a', 'b'))
284            
285            print(i(RunCoro, my_coro_a_explicit, 'a', 'b'))
286            
287            print(i(RunCoro, my_coro_asyncio('a', 'b')))
288            print(i(RunCoro, cs_acoro(my_coro_asyncio), 'a', 'b'))
289
290            # PutCoro
291
292            i(PutCoro, my_coro_g('a', 'b'))
293            i(PutCoro, my_coro_g, 'a', 'b')
294
295            i(PutCoro, my_coro_a('a', 'b'))
296            i(PutCoro, my_coro_a, 'a', 'b')
297            
298            i(PutCoro, my_coro_a_implicit('a', 'b'))
299            i(PutCoro, my_coro_a_implicit, 'a', 'b')
300            
301            i(PutCoro, my_coro_a_explicit, 'a', 'b')
302            
303            i(PutCoro, my_coro_asyncio('a', 'b'))
304            i(PutCoro, cs_acoro(my_coro_asyncio), 'a', 'b')
305
306    Args:
307        coro_worker (Worker): _description_
308
309    Returns:
310        Coroutine: _description_
311    """
312    if is_async(coro_worker):
313        @wraps(coro_worker)
314        async def wrapper(*args, **kwargs) -> Any:
315            if isawaitable(coro_worker):
316                return await coro_worker
317            else:
318                return await coro_worker(*args, **kwargs)
319        
320        wrapper: coro_worker
321        return wrapper
322    else:
323        @wraps(coro_worker)
324        async def wrapper(*args, **kwargs) -> Any:
325            from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import RunCoro
326            i: Interface = current_interface()
327            return await i(RunCoro, cs_coro(coro_worker), *args, **kwargs)
328        
329        wrapper: coro_worker
330        return wrapper

Decorator. Without arguments. Makes any Cengal coro awaitable from the async code (like coroutines in asyncio) Example: from cengal.parallel_execution.coroutines.coro_scheduler import cs_acoro from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import RunCoro from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro from cengal.parallel_execution.coroutines.coro_standard_services.sleep import Sleep import asyncio

@cs_awaitable
def my_coro_g(a: str, b: str) -> str:
    i: Interface = current_interface()
    i(Sleep, 0.1)

    return a + b

@cs_awaitable
async def my_coro_a(a: str, b: str) -> str:
    i: Interface = current_interface()
    await i(Sleep, 0.05)

    await asyncio.sleep(0.05)

    return a + b

@cs_acoro
async def my_coro_a_implicit(a: str, b: str) -> str:
    i: Interface = current_interface()
    await i(Sleep, 0.05)

    await asyncio.sleep(0.05)

    return a + b

async def my_coro_a_explicit(i: Interface, a: str, b: str) -> str:
    await i(Sleep, 0.05)

    await asyncio.sleep(0.05)

    return a + b

async def my_coro_asyncio(a: str, b: str) -> str:
    await asyncio.sleep(0.1)

    return a + b

@cs_awaitable
async def amain():
    i: Interface = current_interface()

    # await

    print(await my_coro_g('a', 'b'))
    print(await my_coro_a('a', 'b'))
    print(await my_coro_a_implicit('a', 'b'))
    print(await my_coro_a_explicit(i, 'a', 'b'))
    print(await my_coro_asyncio('a', 'b'))

    # RunCoro

    print(await i(RunCoro, my_coro_g('a', 'b')))
    print(await i(RunCoro, my_coro_g, 'a', 'b'))

    print(await i(RunCoro, my_coro_a('a', 'b')))
    print(await i(RunCoro, my_coro_a, 'a', 'b'))

    print(await i(RunCoro, my_coro_a_implicit('a', 'b')))
    print(await i(RunCoro, my_coro_a_implicit, 'a', 'b'))

    print(await i(RunCoro, my_coro_a_explicit, 'a', 'b'))

    print(await i(RunCoro, my_coro_asyncio('a', 'b')))
    print(await i(RunCoro, cs_acoro(my_coro_asyncio), 'a', 'b'))

    # PutCoro

    await i(PutCoro, my_coro_g('a', 'b'))
    await i(PutCoro, my_coro_g, 'a', 'b')

    await i(PutCoro, my_coro_a('a', 'b'))
    await i(PutCoro, my_coro_a, 'a', 'b')

    await i(PutCoro, my_coro_a_implicit('a', 'b'))
    await i(PutCoro, my_coro_a_implicit, 'a', 'b')

    await i(PutCoro, my_coro_a_explicit, 'a', 'b')

    await i(PutCoro, my_coro_asyncio('a', 'b'))
    await i(PutCoro, cs_acoro(my_coro_asyncio), 'a', 'b')

@cs_awaitable
def main():
    i: Interface = current_interface()

    # RunCoro

    print(i(RunCoro, my_coro_g('a', 'b')))
    print(i(RunCoro, my_coro_g, 'a', 'b'))

    print(i(RunCoro, my_coro_a('a', 'b')))
    print(i(RunCoro, my_coro_a, 'a', 'b'))

    print(i(RunCoro, my_coro_a_implicit('a', 'b')))
    print(i(RunCoro, my_coro_a_implicit, 'a', 'b'))

    print(i(RunCoro, my_coro_a_explicit, 'a', 'b'))

    print(i(RunCoro, my_coro_asyncio('a', 'b')))
    print(i(RunCoro, cs_acoro(my_coro_asyncio), 'a', 'b'))

    # PutCoro

    i(PutCoro, my_coro_g('a', 'b'))
    i(PutCoro, my_coro_g, 'a', 'b')

    i(PutCoro, my_coro_a('a', 'b'))
    i(PutCoro, my_coro_a, 'a', 'b')

    i(PutCoro, my_coro_a_implicit('a', 'b'))
    i(PutCoro, my_coro_a_implicit, 'a', 'b')

    i(PutCoro, my_coro_a_explicit, 'a', 'b')

    i(PutCoro, my_coro_asyncio('a', 'b'))
    i(PutCoro, cs_acoro(my_coro_asyncio), 'a', 'b')

Args: coro_worker (Worker): _description_

Returns: Coroutine: _description_

class RunSchedulerInAsyncioLoop:
344class RunSchedulerInAsyncioLoop:
345    def __init__(self, scheduler: CoroSchedulerType, idle_time: Optional[Union[int, float]]=None, loop: Optional[asyncio.AbstractEventLoop]=None, execute_every_X_iterations: int = 1):
346        self.scheduler = scheduler
347        self.scheduler.on_woke_up_callback = self.on_woke_up_callback
348        if idle_time is None:
349            idle_time = get_usable_min_sleep_interval()
350        
351        self.idle_time = get_min_sleep_interval() * (idle_time // get_min_sleep_interval())
352        self.min_sleep_interval = max(get_min_sleep_interval(), self.idle_time or 0)
353        self.usable_idle_time = (get_min_sleep_interval() * (idle_time // get_min_sleep_interval())) + get_countable_delta_time()
354        self.loop = loop or asyncio.get_event_loop()
355        self.handle = None  # type: Optional[asyncio.Handle]
356        self.make_idle_when_possible = False
357        self.need_to_stop_when_possible = False
358        self.need_to_stop_now = False
359        self.in_idle_state = False
360        if execute_every_X_iterations < 1:
361            execute_every_X_iterations = 1
362        
363        self.execute_every_X_iterations = execute_every_X_iterations
364        self.current_iteration = execute_every_X_iterations
365    
366    def on_woke_up_callback(self):
367        if self.in_idle_state:
368            self.cancel_handle()
369            self.in_idle_state = False
370            self.register()
371
372    def __call__(self, *args, **kwargs):
373        self.current_iteration -= 1
374        if self.current_iteration:
375            self.register()
376            return
377        else:
378            self.current_iteration = self.execute_every_X_iterations
379        
380        self.handle = None
381        self.in_idle_state = False
382        if self.need_to_stop_now:
383            self.need_to_stop_now = False
384            self.need_to_stop_when_possible = False
385            return
386
387        in_work = self.scheduler.iteration()
388        
389        if (not in_work) and self.need_to_stop_when_possible:
390            self.need_to_stop_now = False
391            self.need_to_stop_when_possible = False
392            return
393
394        idle_time = self.scheduler.next_event_after()
395        # is_awake = self.scheduler.is_awake()
396        is_idle = self.scheduler.is_idle()
397        need_to_register = False
398        if self.make_idle_when_possible:
399            if not is_idle:
400                need_to_register = True
401        else:
402            need_to_register = True
403
404        if need_to_register:
405            self.register()
406        else:
407            self.register_idle(idle_time)
408
409    def ready_to_stop(self) -> bool:
410        return not self.scheduler.in_work()
411
412    def register(self):
413        # self.register_idle()
414        self.handle = self.loop.call_soon(self)
415
416    def register_idle(self, idle_time: Optional[float] = None):
417        if idle_time is None:
418            idle_time = self.usable_idle_time
419        
420        # self.handle = self.loop.call_later(self.idle_time, self)
421        if idle_time < self.min_sleep_interval:
422            self.register()
423        else:
424            # self.handle = try_sleep(self.idle_time * (idle_time // self.idle_time), self.loop.call_later, self)
425            # # self.handle = try_sleep(0.001, self.loop.call_later, self)
426            self.in_idle_state = True
427            idle_time = (self.idle_time * (idle_time // self.idle_time)) + get_countable_delta_time()
428            self.handle = self.loop.call_later(idle_time, self)
429
430    def cancel_handle(self):
431        if self.handle and (not self.handle.cancelled()):
432            self.handle.cancel()
433            self.handle = None
434
435    def stop(self):
436        if self.handle and (not self.handle.cancelled()):
437            self.need_to_stop_now = True
438            self.handle.cancel()
439            self.handle = None
440
441    def stop_when_possible(self):
442        self.need_to_stop_when_possible = True
RunSchedulerInAsyncioLoop( scheduler: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable], idle_time: typing.Union[int, float, NoneType] = None, loop: typing.Union[asyncio.events.AbstractEventLoop, NoneType] = None, execute_every_X_iterations: int = 1)
345    def __init__(self, scheduler: CoroSchedulerType, idle_time: Optional[Union[int, float]]=None, loop: Optional[asyncio.AbstractEventLoop]=None, execute_every_X_iterations: int = 1):
346        self.scheduler = scheduler
347        self.scheduler.on_woke_up_callback = self.on_woke_up_callback
348        if idle_time is None:
349            idle_time = get_usable_min_sleep_interval()
350        
351        self.idle_time = get_min_sleep_interval() * (idle_time // get_min_sleep_interval())
352        self.min_sleep_interval = max(get_min_sleep_interval(), self.idle_time or 0)
353        self.usable_idle_time = (get_min_sleep_interval() * (idle_time // get_min_sleep_interval())) + get_countable_delta_time()
354        self.loop = loop or asyncio.get_event_loop()
355        self.handle = None  # type: Optional[asyncio.Handle]
356        self.make_idle_when_possible = False
357        self.need_to_stop_when_possible = False
358        self.need_to_stop_now = False
359        self.in_idle_state = False
360        if execute_every_X_iterations < 1:
361            execute_every_X_iterations = 1
362        
363        self.execute_every_X_iterations = execute_every_X_iterations
364        self.current_iteration = execute_every_X_iterations
scheduler
idle_time
min_sleep_interval
usable_idle_time
loop
handle
make_idle_when_possible
need_to_stop_when_possible
need_to_stop_now
in_idle_state
execute_every_X_iterations
current_iteration
def on_woke_up_callback(self):
366    def on_woke_up_callback(self):
367        if self.in_idle_state:
368            self.cancel_handle()
369            self.in_idle_state = False
370            self.register()
def ready_to_stop(self) -> bool:
409    def ready_to_stop(self) -> bool:
410        return not self.scheduler.in_work()
def register(self):
412    def register(self):
413        # self.register_idle()
414        self.handle = self.loop.call_soon(self)
def register_idle(self, idle_time: typing.Union[float, NoneType] = None):
416    def register_idle(self, idle_time: Optional[float] = None):
417        if idle_time is None:
418            idle_time = self.usable_idle_time
419        
420        # self.handle = self.loop.call_later(self.idle_time, self)
421        if idle_time < self.min_sleep_interval:
422            self.register()
423        else:
424            # self.handle = try_sleep(self.idle_time * (idle_time // self.idle_time), self.loop.call_later, self)
425            # # self.handle = try_sleep(0.001, self.loop.call_later, self)
426            self.in_idle_state = True
427            idle_time = (self.idle_time * (idle_time // self.idle_time)) + get_countable_delta_time()
428            self.handle = self.loop.call_later(idle_time, self)
def cancel_handle(self):
430    def cancel_handle(self):
431        if self.handle and (not self.handle.cancelled()):
432            self.handle.cancel()
433            self.handle = None
def stop(self):
435    def stop(self):
436        if self.handle and (not self.handle.cancelled()):
437            self.need_to_stop_now = True
438            self.handle.cancel()
439            self.handle = None
def stop_when_possible(self):
441    def stop_when_possible(self):
442        self.need_to_stop_when_possible = True
def coro_interfaces_arg_manager( event_loop, coro_scheduler: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable]):
333def coro_interfaces_arg_manager(event_loop, coro_scheduler: CoroSchedulerType):
334    acf = ArgsManager(
335        EArgs(event_loop, coro_scheduler, CoroType.auto),
336    ).callable(await_coro_fast)
337    atf = ArgsManager(
338        EArgs(event_loop, coro_scheduler),
339    ).callable(await_task_fast)
340    
341    return EArgs(await_coro_fast=acf, await_task_fast=atf)
def create_task( loop: typing.Union[asyncio.events.AbstractEventLoop, NoneType], acoro: typing.Callable, *args, **kwargs) -> _asyncio.Task:
471def create_task(loop: Optional[asyncio.AbstractEventLoop], acoro: Callable, *args, **kwargs) -> asyncio.Task:
472    if __debug__: dlog('call_soon_future - start')
473    loop = loop or asyncio.get_event_loop()
474    awaitable_coro_obj = acoro(*args, **kwargs)
475    return loop.create_task(awaitable_coro_obj)
def create_task_in_thread_pool( loop: typing.Union[asyncio.events.AbstractEventLoop, NoneType], joiner: typing.Union[typing.Awaitable[typing.Any], NoneType], worker: typing.Callable, *args, **kwargs):
478def create_task_in_thread_pool(loop: Optional[asyncio.AbstractEventLoop], joiner: Optional[Awaitable[Any]], worker: Callable, *args, **kwargs):
479    async def sync_handler(loop, joiner, worker, args, kwargs):
480        worker_with_args = partial(worker, *args, **kwargs)
481        result = await loop.run_in_executor(None, worker_with_args)
482        if joiner is not None:
483            await joiner(result)
484                        
485    create_task(loop, sync_handler, loop, joiner, worker, args, kwargs)
async def task_in_thread_pool( loop: typing.Union[asyncio.events.AbstractEventLoop, NoneType], worker: typing.Callable, *args, **kwargs):
488async def task_in_thread_pool(loop: Optional[asyncio.AbstractEventLoop], worker: Callable, *args, **kwargs):
489    worker_with_args = partial(worker, *args, **kwargs)
490    return await loop.run_in_executor(None, worker_with_args)