cengal.parallel_execution.coroutines.coro_tools.await_coro.versions.v_0.await_coro
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18__all__ = ['await_coro_fast', 'await_coro', 'await_coro_prim', 'asyncio_coro', 'await_task_fast', 'await_task', 'await_task_prim', 19 'cs_awaitable', 20 'RunSchedulerInAsyncioLoop', 21 'coro_interfaces_arg_manager', 'create_task', 'create_task_in_thread_pool', 'task_in_thread_pool'] 22 23import sys 24import asyncio 25from inspect import signature, Signature, Parameter, isawaitable 26from cengal.parallel_execution.coroutines.coro_scheduler import * 27# from cengal.parallel_execution.coroutines.coro_standard_services import * 28from cengal.code_flow_control.args_manager import ArgsManager, EArgs 29from enum import Enum 30from functools import partial 31from typing import Union, Optional, Callable, Awaitable, Any, Coroutine 32from cengal.time_management.sleep_tools import get_min_sleep_interval, try_sleep, get_usable_min_sleep_interval, get_countable_delta_time 33from cengal.introspection.inspect import get_exception, get_exception_tripple, is_async 34from cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request import * 35from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro 36 37from functools import wraps, update_wrapper 38 39 40""" 41Module Docstring 42Docstrings: http://www.python.org/dev/peps/pep-0257/ 43""" 44 45__author__ = "ButenkoMS <gtalk@butenkoms.space>" 46__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 47__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 48__license__ = "Apache License, Version 2.0" 49__version__ = "4.4.1" 50__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 51__email__ = "gtalk@butenkoms.space" 52# __status__ = "Prototype" 53__status__ = "Development" 54# __status__ = "Production" 55 56 57def _async_coro_wrapper(interface: Interface, future: asyncio.Future, coro_worker: Worker, *args, **kwargs): 58 coro_worker_result = None 59 try: 60 if __debug__: dlog(f'λ wrapper => {func_info(coro_worker)}') 61 coro_worker_result = coro_worker(interface, *args, **kwargs) 62 except: 63 if __debug__: dlog(f'λ wrapper (Exception) <= {repr(coro_worker)}') 64 if not future.cancelled(): 65 ex_type, ex_value, ex_traceback = get_exception_tripple() 66 if __debug__: dlog(ex_type, ex_value, ex_traceback) 67 ex_value = ex_value.with_traceback(ex_traceback) 68 future.set_exception(ex_value) 69 else: 70 if __debug__: dlog(f'λ wrapper <= {repr(coro_worker)}') 71 if not future.cancelled(): 72 future.set_result(coro_worker_result) 73 74 75async def _awaitable_async_coro_wrapper(interface: Interface, future: asyncio.Future, coro_worker: Worker, *args, **kwargs): 76 coro_worker_result = None 77 try: 78 if __debug__: dlog(f'aλ wrapper => {func_info(coro_worker)}') 79 80 # # TODO: fix required! 81 # coro_worker_result = await interface(RunCoro, CoroType.awaitable, coro_worker, *args, **kwargs) 82 # TODO: possible fix: 83 coro_worker_result = await coro_worker(interface, *args, **kwargs) 84 except: 85 if __debug__: dlog(f'aλ wrapper (Exception) <= {repr(coro_worker)}') 86 if not future.cancelled(): 87 ex_type, ex_value, ex_traceback = get_exception_tripple() 88 if __debug__: dlog(ex_type, ex_value, ex_traceback) 89 ex_value = ex_value.with_traceback(ex_traceback) 90 future.set_exception(ex_value) 91 else: 92 if __debug__: dlog(f'aλ wrapper <= {repr(coro_worker)}') 93 if not future.cancelled(): 94 future.set_result(coro_worker_result) 95 96 97def await_coro_fast(loop: asyncio.AbstractEventLoop, 98 scheduler: CoroSchedulerType, coro_type: Optional[CoroType], coro_worker: Worker, *args, **kwargs 99 ) -> asyncio.Future: 100 coro_type = coro_type or CoroType.auto 101 if CoroType.auto == coro_type: 102 coro_type = find_coro_type(coro_worker) 103 104 future = loop.create_future() 105 if CoroType.awaitable == coro_type: 106 put_request_to_service_with_context(get_interface_and_loop_with_explicit_loop(scheduler), PutCoro, ExplicitWorker(coro_type, _awaitable_async_coro_wrapper), future, coro_worker, *args, **kwargs) 107 elif CoroType.greenlet == coro_type: 108 put_request_to_service_with_context(get_interface_and_loop_with_explicit_loop(scheduler), PutCoro, ExplicitWorker(coro_type, _async_coro_wrapper), future, coro_worker, *args, **kwargs) 109 else: 110 raise NotImplementedError 111 112 return future 113 114 115def await_coro(scheduler: CoroSchedulerType, coro_worker: Worker, *args, **kwargs) -> asyncio.Future: 116 return await_coro_fast(asyncio.get_event_loop(), scheduler, None, coro_worker, *args, **kwargs) 117 118 119def await_coro_prim(coro_worker: Worker, *args, **kwargs) -> asyncio.Future: 120 """Tries to use primary coro scheduler (must be set before usage) 121 122 Args: 123 coro_worker (Worker): _description_ 124 125 Returns: 126 asyncio.Future: _description_ 127 """ 128 return await_coro_fast(asyncio.get_event_loop(), available_coro_scheduler(), CoroType.auto, coro_worker, *args, **kwargs) 129 130 131def asyncio_coro(coro_worker: Worker) -> Coroutine: 132 """Decorator. Without arguments. Gives an ability to await any decorated Cengal coroutine from the async code 133 134 Args: 135 coro_worker (Worker): _description_ 136 137 Returns: 138 Coroutine: _description_ 139 """ 140 async def wrapper(*args, **kwargs): 141 return await await_coro_prim(coro_worker, *args, **kwargs) 142 143 coro_worker_sign: Signature = signature(coro_worker) 144 wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation) 145 return wrapper 146 147 148def _async_task_runner_coro_worker(interface: Interface, service_type: ServiceType, *args, **kwargs): 149 if __debug__: dlog(f'λ wrapper <_async_task_runner_coro_worker> => interface({service_type}, {args}, {kwargs})') 150 result = interface(service_type, *args, **kwargs) 151 if __debug__: dlog(f'λ wrapper <_async_task_runner_coro_worker> <= interface({service_type}, {args}, {kwargs}): {result}') 152 return result 153 154 155def await_task_fast(loop: asyncio.AbstractEventLoop, 156 scheduler: CoroSchedulerType, service_type: ServiceType, *args, **kwargs 157 ) -> asyncio.Future: 158 future = loop.create_future() 159 put_request_to_service_with_context(get_interface_and_loop_with_explicit_loop(scheduler), PutCoro, ExplicitWorker(CoroType.greenlet, _async_coro_wrapper), future, _async_task_runner_coro_worker, service_type, *args, **kwargs) 160 return future 161 162 163def await_task(scheduler: CoroSchedulerType, service_type: ServiceType, *args, **kwargs) -> asyncio.Future: 164 return await_task_fast(asyncio.get_event_loop(), scheduler, service_type, *args, **kwargs) 165 166 167def await_task_prim(service_type: ServiceType, *args, **kwargs) -> asyncio.Future: 168 """Tries to use primary coro scheduler (must be set before usage) 169 170 Args: 171 service_type (ServiceType): _description_ 172 173 Returns: 174 asyncio.Future: _description_ 175 """ 176 return await_task_fast(asyncio.get_event_loop(), available_coro_scheduler(), service_type, *args, **kwargs) 177 178 179def cs_awaitable(coro_worker: Callable) -> Callable: 180 """Decorator. Without arguments. Makes any Cengal coro awaitable from the async code (like coroutines in asyncio) 181 Example: 182 from cengal.parallel_execution.coroutines.coro_scheduler import cs_acoro 183 from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import RunCoro 184 from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro 185 from cengal.parallel_execution.coroutines.coro_standard_services.sleep import Sleep 186 import asyncio 187 188 @cs_awaitable 189 def my_coro_g(a: str, b: str) -> str: 190 i: Interface = current_interface() 191 i(Sleep, 0.1) 192 193 return a + b 194 195 @cs_awaitable 196 async def my_coro_a(a: str, b: str) -> str: 197 i: Interface = current_interface() 198 await i(Sleep, 0.05) 199 200 await asyncio.sleep(0.05) 201 202 return a + b 203 204 @cs_acoro 205 async def my_coro_a_implicit(a: str, b: str) -> str: 206 i: Interface = current_interface() 207 await i(Sleep, 0.05) 208 209 await asyncio.sleep(0.05) 210 211 return a + b 212 213 async def my_coro_a_explicit(i: Interface, a: str, b: str) -> str: 214 await i(Sleep, 0.05) 215 216 await asyncio.sleep(0.05) 217 218 return a + b 219 220 async def my_coro_asyncio(a: str, b: str) -> str: 221 await asyncio.sleep(0.1) 222 223 return a + b 224 225 @cs_awaitable 226 async def amain(): 227 i: Interface = current_interface() 228 229 # await 230 231 print(await my_coro_g('a', 'b')) 232 print(await my_coro_a('a', 'b')) 233 print(await my_coro_a_implicit('a', 'b')) 234 print(await my_coro_a_explicit(i, 'a', 'b')) 235 print(await my_coro_asyncio('a', 'b')) 236 237 # RunCoro 238 239 print(await i(RunCoro, my_coro_g('a', 'b'))) 240 print(await i(RunCoro, my_coro_g, 'a', 'b')) 241 242 print(await i(RunCoro, my_coro_a('a', 'b'))) 243 print(await i(RunCoro, my_coro_a, 'a', 'b')) 244 245 print(await i(RunCoro, my_coro_a_implicit('a', 'b'))) 246 print(await i(RunCoro, my_coro_a_implicit, 'a', 'b')) 247 248 print(await i(RunCoro, my_coro_a_explicit, 'a', 'b')) 249 250 print(await i(RunCoro, my_coro_asyncio('a', 'b'))) 251 print(await i(RunCoro, cs_acoro(my_coro_asyncio), 'a', 'b')) 252 253 # PutCoro 254 255 await i(PutCoro, my_coro_g('a', 'b')) 256 await i(PutCoro, my_coro_g, 'a', 'b') 257 258 await i(PutCoro, my_coro_a('a', 'b')) 259 await i(PutCoro, my_coro_a, 'a', 'b') 260 261 await i(PutCoro, my_coro_a_implicit('a', 'b')) 262 await i(PutCoro, my_coro_a_implicit, 'a', 'b') 263 264 await i(PutCoro, my_coro_a_explicit, 'a', 'b') 265 266 await i(PutCoro, my_coro_asyncio('a', 'b')) 267 await i(PutCoro, cs_acoro(my_coro_asyncio), 'a', 'b') 268 269 @cs_awaitable 270 def main(): 271 i: Interface = current_interface() 272 273 # RunCoro 274 275 print(i(RunCoro, my_coro_g('a', 'b'))) 276 print(i(RunCoro, my_coro_g, 'a', 'b')) 277 278 print(i(RunCoro, my_coro_a('a', 'b'))) 279 print(i(RunCoro, my_coro_a, 'a', 'b')) 280 281 print(i(RunCoro, my_coro_a_implicit('a', 'b'))) 282 print(i(RunCoro, my_coro_a_implicit, 'a', 'b')) 283 284 print(i(RunCoro, my_coro_a_explicit, 'a', 'b')) 285 286 print(i(RunCoro, my_coro_asyncio('a', 'b'))) 287 print(i(RunCoro, cs_acoro(my_coro_asyncio), 'a', 'b')) 288 289 # PutCoro 290 291 i(PutCoro, my_coro_g('a', 'b')) 292 i(PutCoro, my_coro_g, 'a', 'b') 293 294 i(PutCoro, my_coro_a('a', 'b')) 295 i(PutCoro, my_coro_a, 'a', 'b') 296 297 i(PutCoro, my_coro_a_implicit('a', 'b')) 298 i(PutCoro, my_coro_a_implicit, 'a', 'b') 299 300 i(PutCoro, my_coro_a_explicit, 'a', 'b') 301 302 i(PutCoro, my_coro_asyncio('a', 'b')) 303 i(PutCoro, cs_acoro(my_coro_asyncio), 'a', 'b') 304 305 Args: 306 coro_worker (Worker): _description_ 307 308 Returns: 309 Coroutine: _description_ 310 """ 311 if is_async(coro_worker): 312 @wraps(coro_worker) 313 async def wrapper(*args, **kwargs) -> Any: 314 if isawaitable(coro_worker): 315 return await coro_worker 316 else: 317 return await coro_worker(*args, **kwargs) 318 319 wrapper: coro_worker 320 return wrapper 321 else: 322 @wraps(coro_worker) 323 async def wrapper(*args, **kwargs) -> Any: 324 from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import RunCoro 325 i: Interface = current_interface() 326 return await i(RunCoro, cs_coro(coro_worker), *args, **kwargs) 327 328 wrapper: coro_worker 329 return wrapper 330 331 332def coro_interfaces_arg_manager(event_loop, coro_scheduler: CoroSchedulerType): 333 acf = ArgsManager( 334 EArgs(event_loop, coro_scheduler, CoroType.auto), 335 ).callable(await_coro_fast) 336 atf = ArgsManager( 337 EArgs(event_loop, coro_scheduler), 338 ).callable(await_task_fast) 339 340 return EArgs(await_coro_fast=acf, await_task_fast=atf) 341 342 343class RunSchedulerInAsyncioLoop: 344 def __init__(self, scheduler: CoroSchedulerType, idle_time: Optional[Union[int, float]]=None, loop: Optional[asyncio.AbstractEventLoop]=None, execute_every_X_iterations: int = 1): 345 self.scheduler = scheduler 346 self.scheduler.on_woke_up_callback = self.on_woke_up_callback 347 if idle_time is None: 348 idle_time = get_usable_min_sleep_interval() 349 350 self.idle_time = get_min_sleep_interval() * (idle_time // get_min_sleep_interval()) 351 self.min_sleep_interval = max(get_min_sleep_interval(), self.idle_time or 0) 352 self.usable_idle_time = (get_min_sleep_interval() * (idle_time // get_min_sleep_interval())) + get_countable_delta_time() 353 self.loop = loop or asyncio.get_event_loop() 354 self.handle = None # type: Optional[asyncio.Handle] 355 self.make_idle_when_possible = False 356 self.need_to_stop_when_possible = False 357 self.need_to_stop_now = False 358 self.in_idle_state = False 359 if execute_every_X_iterations < 1: 360 execute_every_X_iterations = 1 361 362 self.execute_every_X_iterations = execute_every_X_iterations 363 self.current_iteration = execute_every_X_iterations 364 365 def on_woke_up_callback(self): 366 if self.in_idle_state: 367 self.cancel_handle() 368 self.in_idle_state = False 369 self.register() 370 371 def __call__(self, *args, **kwargs): 372 self.current_iteration -= 1 373 if self.current_iteration: 374 self.register() 375 return 376 else: 377 self.current_iteration = self.execute_every_X_iterations 378 379 self.handle = None 380 self.in_idle_state = False 381 if self.need_to_stop_now: 382 self.need_to_stop_now = False 383 self.need_to_stop_when_possible = False 384 return 385 386 in_work = self.scheduler.iteration() 387 388 if (not in_work) and self.need_to_stop_when_possible: 389 self.need_to_stop_now = False 390 self.need_to_stop_when_possible = False 391 return 392 393 idle_time = self.scheduler.next_event_after() 394 # is_awake = self.scheduler.is_awake() 395 is_idle = self.scheduler.is_idle() 396 need_to_register = False 397 if self.make_idle_when_possible: 398 if not is_idle: 399 need_to_register = True 400 else: 401 need_to_register = True 402 403 if need_to_register: 404 self.register() 405 else: 406 self.register_idle(idle_time) 407 408 def ready_to_stop(self) -> bool: 409 return not self.scheduler.in_work() 410 411 def register(self): 412 # self.register_idle() 413 self.handle = self.loop.call_soon(self) 414 415 def register_idle(self, idle_time: Optional[float] = None): 416 if idle_time is None: 417 idle_time = self.usable_idle_time 418 419 # self.handle = self.loop.call_later(self.idle_time, self) 420 if idle_time < self.min_sleep_interval: 421 self.register() 422 else: 423 # self.handle = try_sleep(self.idle_time * (idle_time // self.idle_time), self.loop.call_later, self) 424 # # self.handle = try_sleep(0.001, self.loop.call_later, self) 425 self.in_idle_state = True 426 idle_time = (self.idle_time * (idle_time // self.idle_time)) + get_countable_delta_time() 427 self.handle = self.loop.call_later(idle_time, self) 428 429 def cancel_handle(self): 430 if self.handle and (not self.handle.cancelled()): 431 self.handle.cancel() 432 self.handle = None 433 434 def stop(self): 435 if self.handle and (not self.handle.cancelled()): 436 self.need_to_stop_now = True 437 self.handle.cancel() 438 self.handle = None 439 440 def stop_when_possible(self): 441 self.need_to_stop_when_possible = True 442 443 444# def call_soon_future(loop: Optional[asyncio.AbstractEventLoop], awaitable_coro_obj: Awaitable): 445# if __debug__: dlog('call_soon_future - start') 446# loop = loop or asyncio.get_event_loop() 447# 448# def handler(): 449# if __debug__: dlog('call_soon_future.handler - start') 450# loop.create_task(awaitable_coro_obj) 451# if __debug__: dlog('call_soon_future.handler - end') 452# 453# loop.get_event_loop().call_soon(handler) 454# if __debug__: dlog('call_soon_future - end') 455 456 457def call_soon_future(loop: Optional[asyncio.AbstractEventLoop], awaitable_coro_obj: Awaitable): 458 if __debug__: dlog('call_soon_future - start') 459 loop = loop or asyncio.get_event_loop() 460 461 def handler(): 462 if __debug__: dlog('call_soon_future.handler - start') 463 loop.create_task(awaitable_coro_obj) 464 if __debug__: dlog('call_soon_future.handler - end') 465 466 loop.get_event_loop().call_soon(handler) 467 if __debug__: dlog('call_soon_future - end') 468 469 470def create_task(loop: Optional[asyncio.AbstractEventLoop], acoro: Callable, *args, **kwargs) -> asyncio.Task: 471 if __debug__: dlog('call_soon_future - start') 472 loop = loop or asyncio.get_event_loop() 473 awaitable_coro_obj = acoro(*args, **kwargs) 474 return loop.create_task(awaitable_coro_obj) 475 476 477def create_task_in_thread_pool(loop: Optional[asyncio.AbstractEventLoop], joiner: Optional[Awaitable[Any]], worker: Callable, *args, **kwargs): 478 async def sync_handler(loop, joiner, worker, args, kwargs): 479 worker_with_args = partial(worker, *args, **kwargs) 480 result = await loop.run_in_executor(None, worker_with_args) 481 if joiner is not None: 482 await joiner(result) 483 484 create_task(loop, sync_handler, loop, joiner, worker, args, kwargs) 485 486 487async def task_in_thread_pool(loop: Optional[asyncio.AbstractEventLoop], worker: Callable, *args, **kwargs): 488 worker_with_args = partial(worker, *args, **kwargs) 489 return await loop.run_in_executor(None, worker_with_args)
98def await_coro_fast(loop: asyncio.AbstractEventLoop, 99 scheduler: CoroSchedulerType, coro_type: Optional[CoroType], coro_worker: Worker, *args, **kwargs 100 ) -> asyncio.Future: 101 coro_type = coro_type or CoroType.auto 102 if CoroType.auto == coro_type: 103 coro_type = find_coro_type(coro_worker) 104 105 future = loop.create_future() 106 if CoroType.awaitable == coro_type: 107 put_request_to_service_with_context(get_interface_and_loop_with_explicit_loop(scheduler), PutCoro, ExplicitWorker(coro_type, _awaitable_async_coro_wrapper), future, coro_worker, *args, **kwargs) 108 elif CoroType.greenlet == coro_type: 109 put_request_to_service_with_context(get_interface_and_loop_with_explicit_loop(scheduler), PutCoro, ExplicitWorker(coro_type, _async_coro_wrapper), future, coro_worker, *args, **kwargs) 110 else: 111 raise NotImplementedError 112 113 return future
120def await_coro_prim(coro_worker: Worker, *args, **kwargs) -> asyncio.Future: 121 """Tries to use primary coro scheduler (must be set before usage) 122 123 Args: 124 coro_worker (Worker): _description_ 125 126 Returns: 127 asyncio.Future: _description_ 128 """ 129 return await_coro_fast(asyncio.get_event_loop(), available_coro_scheduler(), CoroType.auto, coro_worker, *args, **kwargs)
Tries to use primary coro scheduler (must be set before usage)
Args: coro_worker (Worker): _description_
Returns: asyncio.Future: _description_
132def asyncio_coro(coro_worker: Worker) -> Coroutine: 133 """Decorator. Without arguments. Gives an ability to await any decorated Cengal coroutine from the async code 134 135 Args: 136 coro_worker (Worker): _description_ 137 138 Returns: 139 Coroutine: _description_ 140 """ 141 async def wrapper(*args, **kwargs): 142 return await await_coro_prim(coro_worker, *args, **kwargs) 143 144 coro_worker_sign: Signature = signature(coro_worker) 145 wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation) 146 return wrapper
Decorator. Without arguments. Gives an ability to await any decorated Cengal coroutine from the async code
Args: coro_worker (Worker): _description_
Returns: Coroutine: _description_
156def await_task_fast(loop: asyncio.AbstractEventLoop, 157 scheduler: CoroSchedulerType, service_type: ServiceType, *args, **kwargs 158 ) -> asyncio.Future: 159 future = loop.create_future() 160 put_request_to_service_with_context(get_interface_and_loop_with_explicit_loop(scheduler), PutCoro, ExplicitWorker(CoroType.greenlet, _async_coro_wrapper), future, _async_task_runner_coro_worker, service_type, *args, **kwargs) 161 return future
168def await_task_prim(service_type: ServiceType, *args, **kwargs) -> asyncio.Future: 169 """Tries to use primary coro scheduler (must be set before usage) 170 171 Args: 172 service_type (ServiceType): _description_ 173 174 Returns: 175 asyncio.Future: _description_ 176 """ 177 return await_task_fast(asyncio.get_event_loop(), available_coro_scheduler(), service_type, *args, **kwargs)
Tries to use primary coro scheduler (must be set before usage)
Args: service_type (ServiceType): _description_
Returns: asyncio.Future: _description_
180def cs_awaitable(coro_worker: Callable) -> Callable: 181 """Decorator. Without arguments. Makes any Cengal coro awaitable from the async code (like coroutines in asyncio) 182 Example: 183 from cengal.parallel_execution.coroutines.coro_scheduler import cs_acoro 184 from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import RunCoro 185 from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro 186 from cengal.parallel_execution.coroutines.coro_standard_services.sleep import Sleep 187 import asyncio 188 189 @cs_awaitable 190 def my_coro_g(a: str, b: str) -> str: 191 i: Interface = current_interface() 192 i(Sleep, 0.1) 193 194 return a + b 195 196 @cs_awaitable 197 async def my_coro_a(a: str, b: str) -> str: 198 i: Interface = current_interface() 199 await i(Sleep, 0.05) 200 201 await asyncio.sleep(0.05) 202 203 return a + b 204 205 @cs_acoro 206 async def my_coro_a_implicit(a: str, b: str) -> str: 207 i: Interface = current_interface() 208 await i(Sleep, 0.05) 209 210 await asyncio.sleep(0.05) 211 212 return a + b 213 214 async def my_coro_a_explicit(i: Interface, a: str, b: str) -> str: 215 await i(Sleep, 0.05) 216 217 await asyncio.sleep(0.05) 218 219 return a + b 220 221 async def my_coro_asyncio(a: str, b: str) -> str: 222 await asyncio.sleep(0.1) 223 224 return a + b 225 226 @cs_awaitable 227 async def amain(): 228 i: Interface = current_interface() 229 230 # await 231 232 print(await my_coro_g('a', 'b')) 233 print(await my_coro_a('a', 'b')) 234 print(await my_coro_a_implicit('a', 'b')) 235 print(await my_coro_a_explicit(i, 'a', 'b')) 236 print(await my_coro_asyncio('a', 'b')) 237 238 # RunCoro 239 240 print(await i(RunCoro, my_coro_g('a', 'b'))) 241 print(await i(RunCoro, my_coro_g, 'a', 'b')) 242 243 print(await i(RunCoro, my_coro_a('a', 'b'))) 244 print(await i(RunCoro, my_coro_a, 'a', 'b')) 245 246 print(await i(RunCoro, my_coro_a_implicit('a', 'b'))) 247 print(await i(RunCoro, my_coro_a_implicit, 'a', 'b')) 248 249 print(await i(RunCoro, my_coro_a_explicit, 'a', 'b')) 250 251 print(await i(RunCoro, my_coro_asyncio('a', 'b'))) 252 print(await i(RunCoro, cs_acoro(my_coro_asyncio), 'a', 'b')) 253 254 # PutCoro 255 256 await i(PutCoro, my_coro_g('a', 'b')) 257 await i(PutCoro, my_coro_g, 'a', 'b') 258 259 await i(PutCoro, my_coro_a('a', 'b')) 260 await i(PutCoro, my_coro_a, 'a', 'b') 261 262 await i(PutCoro, my_coro_a_implicit('a', 'b')) 263 await i(PutCoro, my_coro_a_implicit, 'a', 'b') 264 265 await i(PutCoro, my_coro_a_explicit, 'a', 'b') 266 267 await i(PutCoro, my_coro_asyncio('a', 'b')) 268 await i(PutCoro, cs_acoro(my_coro_asyncio), 'a', 'b') 269 270 @cs_awaitable 271 def main(): 272 i: Interface = current_interface() 273 274 # RunCoro 275 276 print(i(RunCoro, my_coro_g('a', 'b'))) 277 print(i(RunCoro, my_coro_g, 'a', 'b')) 278 279 print(i(RunCoro, my_coro_a('a', 'b'))) 280 print(i(RunCoro, my_coro_a, 'a', 'b')) 281 282 print(i(RunCoro, my_coro_a_implicit('a', 'b'))) 283 print(i(RunCoro, my_coro_a_implicit, 'a', 'b')) 284 285 print(i(RunCoro, my_coro_a_explicit, 'a', 'b')) 286 287 print(i(RunCoro, my_coro_asyncio('a', 'b'))) 288 print(i(RunCoro, cs_acoro(my_coro_asyncio), 'a', 'b')) 289 290 # PutCoro 291 292 i(PutCoro, my_coro_g('a', 'b')) 293 i(PutCoro, my_coro_g, 'a', 'b') 294 295 i(PutCoro, my_coro_a('a', 'b')) 296 i(PutCoro, my_coro_a, 'a', 'b') 297 298 i(PutCoro, my_coro_a_implicit('a', 'b')) 299 i(PutCoro, my_coro_a_implicit, 'a', 'b') 300 301 i(PutCoro, my_coro_a_explicit, 'a', 'b') 302 303 i(PutCoro, my_coro_asyncio('a', 'b')) 304 i(PutCoro, cs_acoro(my_coro_asyncio), 'a', 'b') 305 306 Args: 307 coro_worker (Worker): _description_ 308 309 Returns: 310 Coroutine: _description_ 311 """ 312 if is_async(coro_worker): 313 @wraps(coro_worker) 314 async def wrapper(*args, **kwargs) -> Any: 315 if isawaitable(coro_worker): 316 return await coro_worker 317 else: 318 return await coro_worker(*args, **kwargs) 319 320 wrapper: coro_worker 321 return wrapper 322 else: 323 @wraps(coro_worker) 324 async def wrapper(*args, **kwargs) -> Any: 325 from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import RunCoro 326 i: Interface = current_interface() 327 return await i(RunCoro, cs_coro(coro_worker), *args, **kwargs) 328 329 wrapper: coro_worker 330 return wrapper
Decorator. Without arguments. Makes any Cengal coro awaitable from the async code (like coroutines in asyncio) Example: from cengal.parallel_execution.coroutines.coro_scheduler import cs_acoro from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import RunCoro from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro from cengal.parallel_execution.coroutines.coro_standard_services.sleep import Sleep import asyncio
@cs_awaitable
def my_coro_g(a: str, b: str) -> str:
i: Interface = current_interface()
i(Sleep, 0.1)
return a + b
@cs_awaitable
async def my_coro_a(a: str, b: str) -> str:
i: Interface = current_interface()
await i(Sleep, 0.05)
await asyncio.sleep(0.05)
return a + b
@cs_acoro
async def my_coro_a_implicit(a: str, b: str) -> str:
i: Interface = current_interface()
await i(Sleep, 0.05)
await asyncio.sleep(0.05)
return a + b
async def my_coro_a_explicit(i: Interface, a: str, b: str) -> str:
await i(Sleep, 0.05)
await asyncio.sleep(0.05)
return a + b
async def my_coro_asyncio(a: str, b: str) -> str:
await asyncio.sleep(0.1)
return a + b
@cs_awaitable
async def amain():
i: Interface = current_interface()
# await
print(await my_coro_g('a', 'b'))
print(await my_coro_a('a', 'b'))
print(await my_coro_a_implicit('a', 'b'))
print(await my_coro_a_explicit(i, 'a', 'b'))
print(await my_coro_asyncio('a', 'b'))
# RunCoro
print(await i(RunCoro, my_coro_g('a', 'b')))
print(await i(RunCoro, my_coro_g, 'a', 'b'))
print(await i(RunCoro, my_coro_a('a', 'b')))
print(await i(RunCoro, my_coro_a, 'a', 'b'))
print(await i(RunCoro, my_coro_a_implicit('a', 'b')))
print(await i(RunCoro, my_coro_a_implicit, 'a', 'b'))
print(await i(RunCoro, my_coro_a_explicit, 'a', 'b'))
print(await i(RunCoro, my_coro_asyncio('a', 'b')))
print(await i(RunCoro, cs_acoro(my_coro_asyncio), 'a', 'b'))
# PutCoro
await i(PutCoro, my_coro_g('a', 'b'))
await i(PutCoro, my_coro_g, 'a', 'b')
await i(PutCoro, my_coro_a('a', 'b'))
await i(PutCoro, my_coro_a, 'a', 'b')
await i(PutCoro, my_coro_a_implicit('a', 'b'))
await i(PutCoro, my_coro_a_implicit, 'a', 'b')
await i(PutCoro, my_coro_a_explicit, 'a', 'b')
await i(PutCoro, my_coro_asyncio('a', 'b'))
await i(PutCoro, cs_acoro(my_coro_asyncio), 'a', 'b')
@cs_awaitable
def main():
i: Interface = current_interface()
# RunCoro
print(i(RunCoro, my_coro_g('a', 'b')))
print(i(RunCoro, my_coro_g, 'a', 'b'))
print(i(RunCoro, my_coro_a('a', 'b')))
print(i(RunCoro, my_coro_a, 'a', 'b'))
print(i(RunCoro, my_coro_a_implicit('a', 'b')))
print(i(RunCoro, my_coro_a_implicit, 'a', 'b'))
print(i(RunCoro, my_coro_a_explicit, 'a', 'b'))
print(i(RunCoro, my_coro_asyncio('a', 'b')))
print(i(RunCoro, cs_acoro(my_coro_asyncio), 'a', 'b'))
# PutCoro
i(PutCoro, my_coro_g('a', 'b'))
i(PutCoro, my_coro_g, 'a', 'b')
i(PutCoro, my_coro_a('a', 'b'))
i(PutCoro, my_coro_a, 'a', 'b')
i(PutCoro, my_coro_a_implicit('a', 'b'))
i(PutCoro, my_coro_a_implicit, 'a', 'b')
i(PutCoro, my_coro_a_explicit, 'a', 'b')
i(PutCoro, my_coro_asyncio('a', 'b'))
i(PutCoro, cs_acoro(my_coro_asyncio), 'a', 'b')
Args: coro_worker (Worker): _description_
Returns: Coroutine: _description_
344class RunSchedulerInAsyncioLoop: 345 def __init__(self, scheduler: CoroSchedulerType, idle_time: Optional[Union[int, float]]=None, loop: Optional[asyncio.AbstractEventLoop]=None, execute_every_X_iterations: int = 1): 346 self.scheduler = scheduler 347 self.scheduler.on_woke_up_callback = self.on_woke_up_callback 348 if idle_time is None: 349 idle_time = get_usable_min_sleep_interval() 350 351 self.idle_time = get_min_sleep_interval() * (idle_time // get_min_sleep_interval()) 352 self.min_sleep_interval = max(get_min_sleep_interval(), self.idle_time or 0) 353 self.usable_idle_time = (get_min_sleep_interval() * (idle_time // get_min_sleep_interval())) + get_countable_delta_time() 354 self.loop = loop or asyncio.get_event_loop() 355 self.handle = None # type: Optional[asyncio.Handle] 356 self.make_idle_when_possible = False 357 self.need_to_stop_when_possible = False 358 self.need_to_stop_now = False 359 self.in_idle_state = False 360 if execute_every_X_iterations < 1: 361 execute_every_X_iterations = 1 362 363 self.execute_every_X_iterations = execute_every_X_iterations 364 self.current_iteration = execute_every_X_iterations 365 366 def on_woke_up_callback(self): 367 if self.in_idle_state: 368 self.cancel_handle() 369 self.in_idle_state = False 370 self.register() 371 372 def __call__(self, *args, **kwargs): 373 self.current_iteration -= 1 374 if self.current_iteration: 375 self.register() 376 return 377 else: 378 self.current_iteration = self.execute_every_X_iterations 379 380 self.handle = None 381 self.in_idle_state = False 382 if self.need_to_stop_now: 383 self.need_to_stop_now = False 384 self.need_to_stop_when_possible = False 385 return 386 387 in_work = self.scheduler.iteration() 388 389 if (not in_work) and self.need_to_stop_when_possible: 390 self.need_to_stop_now = False 391 self.need_to_stop_when_possible = False 392 return 393 394 idle_time = self.scheduler.next_event_after() 395 # is_awake = self.scheduler.is_awake() 396 is_idle = self.scheduler.is_idle() 397 need_to_register = False 398 if self.make_idle_when_possible: 399 if not is_idle: 400 need_to_register = True 401 else: 402 need_to_register = True 403 404 if need_to_register: 405 self.register() 406 else: 407 self.register_idle(idle_time) 408 409 def ready_to_stop(self) -> bool: 410 return not self.scheduler.in_work() 411 412 def register(self): 413 # self.register_idle() 414 self.handle = self.loop.call_soon(self) 415 416 def register_idle(self, idle_time: Optional[float] = None): 417 if idle_time is None: 418 idle_time = self.usable_idle_time 419 420 # self.handle = self.loop.call_later(self.idle_time, self) 421 if idle_time < self.min_sleep_interval: 422 self.register() 423 else: 424 # self.handle = try_sleep(self.idle_time * (idle_time // self.idle_time), self.loop.call_later, self) 425 # # self.handle = try_sleep(0.001, self.loop.call_later, self) 426 self.in_idle_state = True 427 idle_time = (self.idle_time * (idle_time // self.idle_time)) + get_countable_delta_time() 428 self.handle = self.loop.call_later(idle_time, self) 429 430 def cancel_handle(self): 431 if self.handle and (not self.handle.cancelled()): 432 self.handle.cancel() 433 self.handle = None 434 435 def stop(self): 436 if self.handle and (not self.handle.cancelled()): 437 self.need_to_stop_now = True 438 self.handle.cancel() 439 self.handle = None 440 441 def stop_when_possible(self): 442 self.need_to_stop_when_possible = True
345 def __init__(self, scheduler: CoroSchedulerType, idle_time: Optional[Union[int, float]]=None, loop: Optional[asyncio.AbstractEventLoop]=None, execute_every_X_iterations: int = 1): 346 self.scheduler = scheduler 347 self.scheduler.on_woke_up_callback = self.on_woke_up_callback 348 if idle_time is None: 349 idle_time = get_usable_min_sleep_interval() 350 351 self.idle_time = get_min_sleep_interval() * (idle_time // get_min_sleep_interval()) 352 self.min_sleep_interval = max(get_min_sleep_interval(), self.idle_time or 0) 353 self.usable_idle_time = (get_min_sleep_interval() * (idle_time // get_min_sleep_interval())) + get_countable_delta_time() 354 self.loop = loop or asyncio.get_event_loop() 355 self.handle = None # type: Optional[asyncio.Handle] 356 self.make_idle_when_possible = False 357 self.need_to_stop_when_possible = False 358 self.need_to_stop_now = False 359 self.in_idle_state = False 360 if execute_every_X_iterations < 1: 361 execute_every_X_iterations = 1 362 363 self.execute_every_X_iterations = execute_every_X_iterations 364 self.current_iteration = execute_every_X_iterations
416 def register_idle(self, idle_time: Optional[float] = None): 417 if idle_time is None: 418 idle_time = self.usable_idle_time 419 420 # self.handle = self.loop.call_later(self.idle_time, self) 421 if idle_time < self.min_sleep_interval: 422 self.register() 423 else: 424 # self.handle = try_sleep(self.idle_time * (idle_time // self.idle_time), self.loop.call_later, self) 425 # # self.handle = try_sleep(0.001, self.loop.call_later, self) 426 self.in_idle_state = True 427 idle_time = (self.idle_time * (idle_time // self.idle_time)) + get_countable_delta_time() 428 self.handle = self.loop.call_later(idle_time, self)
333def coro_interfaces_arg_manager(event_loop, coro_scheduler: CoroSchedulerType): 334 acf = ArgsManager( 335 EArgs(event_loop, coro_scheduler, CoroType.auto), 336 ).callable(await_coro_fast) 337 atf = ArgsManager( 338 EArgs(event_loop, coro_scheduler), 339 ).callable(await_task_fast) 340 341 return EArgs(await_coro_fast=acf, await_task_fast=atf)
471def create_task(loop: Optional[asyncio.AbstractEventLoop], acoro: Callable, *args, **kwargs) -> asyncio.Task: 472 if __debug__: dlog('call_soon_future - start') 473 loop = loop or asyncio.get_event_loop() 474 awaitable_coro_obj = acoro(*args, **kwargs) 475 return loop.create_task(awaitable_coro_obj)
478def create_task_in_thread_pool(loop: Optional[asyncio.AbstractEventLoop], joiner: Optional[Awaitable[Any]], worker: Callable, *args, **kwargs): 479 async def sync_handler(loop, joiner, worker, args, kwargs): 480 worker_with_args = partial(worker, *args, **kwargs) 481 result = await loop.run_in_executor(None, worker_with_args) 482 if joiner is not None: 483 await joiner(result) 484 485 create_task(loop, sync_handler, loop, joiner, worker, args, kwargs)