cengal.parallel_execution.coroutines.coro_standard_services.asyncio_loop.versions.v_0.asyncio_loop
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19__all__ = ['AsyncioLoop', 'AsyncioLoopRequest', 'AsyncioLoopWasNotSetError', 'run_in_thread_pool', 'run_in_thread_pool_fast'] 20 21from cengal.parallel_execution.coroutines.coro_scheduler import * 22from cengal.parallel_execution.coroutines.coro_tools.await_coro import create_task 23from cengal.parallel_execution.coroutines.coro_tools.await_coro import task_in_thread_pool 24from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import gly, agly, CoroPriority, LoopYieldPriorityScheduler 25from cengal.parallel_execution.coroutines.coro_standard_services.simple_yield import Yield 26from cengal.parallel_execution.coroutines.coro_standard_services.sleep import Sleep 27from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro 28from cengal.parallel_execution.coroutines.coro_standard_services.throw_coro import ThrowCoro 29from cengal.parallel_execution.coroutines.coro_standard_services.kill_coro import KillCoro 30from cengal.parallel_execution.coroutines.coro_standard_services.wait_coro import WaitCoro, WaitCoroRequest, CoroutineNotFoundError 31from cengal.parallel_execution.asyncio.run_loop import run_forever, cancel_all_tasks 32from cengal.parallel_execution.asyncio.atasks import create_task_awaitable 33from cengal.file_system.file_manager import path_relative_to_current_dir 34from cengal.time_management.cpu_clock_cycles import perf_counter 35from cengal.time_management.sleep_tools import get_usable_min_sleep_interval, get_min_sleep_interval 36from cengal.data_manipulation.serialization import * 37from cengal.introspection.inspect import get_exception 38from cengal.math.numbers import RationalNumber 39from typing import Callable, Tuple, List, Any, Dict, Awaitable, Type 40import sys 41import os 42from asyncio import AbstractEventLoop, get_event_loop, get_running_loop, Task as asyncio_Task, sleep as asyncio_sleep 43from asyncio.coroutines import _is_coroutine 44from cengal.code_flow_control.args_manager import args_kwargs 45from types import coroutine as types_coroutine 46from asyncio import coroutines 47from asyncio import events 48from asyncio import tasks 49import threading 50 51 52""" 53Module Docstring 54Docstrings: http://www.python.org/dev/peps/pep-0257/ 55""" 56 57__author__ = "ButenkoMS <gtalk@butenkoms.space>" 58__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 59__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 60__license__ = "Apache License, Version 2.0" 61__version__ = "4.4.1" 62__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 63__email__ = "gtalk@butenkoms.space" 64# __status__ = "Prototype" 65__status__ = "Development" 66# __status__ = "Production" 67 68 69class AsyncioLoopWasNotSetError(Exception): 70 pass 71 72 73class ExternalAsyncioLoopAlreadyExistsError(Exception): 74 pass 75 76 77class WaitingCancelled(Exception): 78 pass 79 80 81WAITING_FOR_NEW_REQUESTS_EVENT = 'AsyncioLoopRequest - WaitingForNewRequestsEvent' 82 83 84class AsyncioLoopRequest(ServiceRequest): 85 def inherit_surrounding_loop(self) -> AbstractEventLoop: 86 return self._save(0) 87 88 def start_internal_loop(self, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None, debug: bool = False) -> AbstractEventLoop: 89 return self._save(1, main_awaitable, priority, interrupt_when_no_requests, debug) 90 91 def ensure_loop(self, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None) -> AbstractEventLoop: 92 return self._save(2, main_awaitable, priority, interrupt_when_no_requests) 93 94 def set(self, async_loop) -> None: 95 return self._save(3, async_loop) 96 97 def get(self) -> AbstractEventLoop: 98 return self._save(4) 99 100 def wait(self, awaitable: Awaitable) -> Any: 101 return self._save(5, awaitable, False, True) 102 103 def create_task(self, awaitable: Awaitable) -> asyncio_Task: 104 return self._save(6, awaitable) 105 106 def _internal_loop_yield(self) -> None: 107 return self._save(7) 108 109 def turn_on_loops_intercommunication(self, turn_on: bool = True) -> Optional[Callable]: 110 return self._save(8, turn_on) 111 112 def _wait_intercommunication(self, awaitable: Awaitable) -> Any: 113 return self._save(9, awaitable, True, True) 114 115 def wait_idle(self, awaitable: Awaitable) -> Any: 116 return self._save(10, awaitable, False, False) 117 118 def use_higher_level_sleep_manager(self, use_higher_level_sleep_manager: bool = True) -> Any: 119 return self._save(12, use_higher_level_sleep_manager) 120 121 def turn_on_low_latency_io_mode(self, low_latency_io_mode: bool = True) -> Any: 122 return self._save(13, low_latency_io_mode) 123 124 125async def asyncio_coro_sleep_0(): 126 return await asyncio_sleep(0) 127 128 129def request_giver(request): 130 yield request 131 132@types_coroutine 133def asyncio_coro_request(request): 134 return (yield from request_giver(request)) 135 136asyncio_coro_request._is_coroutine = _is_coroutine 137 138 139class AsyncioLoop(Service, EntityStatsMixin): 140 def __init__(self, loop: CoroSchedulerType): 141 super(AsyncioLoop, self).__init__(loop) 142 self.async_loop: Optional[AbstractEventLoop] = None 143 self.internal_async_loop: Optional[AbstractEventLoop] = None 144 self._internal_loop_holding_coro: Optional[CoroWrapperBase] = None 145 self.internal_loop_start_waiters: Set[CoroID] = set() 146 self.need_to_stop_internal_loop: bool = False 147 self.internal_loop_creation_error: Optional[Exception] = None 148 self.internal_loop_in_yield: bool = False 149 self.waiting_for_new_requests: bool = False 150 self.loops_intercommunication: bool = False 151 self._previous_on_wrong_request = None 152 self.intercommunication_requests_coro_ids: Set[CoroID] = set() 153 154 self._request_workers = { 155 0: self._on_inherit_surrounding_loop, 156 1: self._on_start_internal_loop, 157 2: self._on_ensure_loop, 158 3: self._on_set, 159 4: self._on_get, 160 5: self._on_await, 161 6: self._on_create_task, 162 7: self._on__internal_loop_yield, 163 8: self._on_turn_on_loops_intercommunication, 164 9: self._on_await, 165 10: self._on_await, 166 11: self._on__internal_wait_for_new_requests, 167 12: self._on__use_higher_level_sleep_manager, 168 13: self._on__low_latency_io_mode, 169 } 170 171 self.pending_requests_num: int = 0 172 self.new_requests_num: int = 0 173 self.no_idle_calls: Set[CoroID] = set() 174 self.results: Dict[CoroID, Tuple[Any, Exception]] = dict() 175 self._waiting_coro_id: Optional[CoroID] = None 176 self._original_loop_class: Type = None 177 self._idle_for: Optional[RationalNumber] = None # in seconds 178 self.use_higher_level_sleep_manager: bool = False 179 self.current_on_idle_handler: Optional[Callable] = None 180 self.low_latency_io_mode: int = 0 181 182 def destroy(self): 183 if self.internal_async_loop is not None: 184 events._set_running_loop(None) 185 186 def _on_system_loop_idle(self, next_event_after: Optional[RationalNumber]): 187 if next_event_after is None: 188 self._idle_for = max(0.001, get_usable_min_sleep_interval()) 189 else: 190 self._idle_for = max(0.001, next_event_after) 191 192 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 193 return type(self).__name__, { 194 'pending requests num': self.pending_requests_num 195 } 196 197 def single_task_registration_or_immediate_processing( 198 self, request: Optional[AsyncioLoopRequest]=None) -> Tuple[bool, None, None]: 199 if request is not None: 200 return self.resolve_request(request) 201 return True, None, None 202 203 def full_processing_iteration(self): 204 from cengal.parallel_execution.coroutines.coro_standard_services.async_event_bus import AsyncEventBusRequest, try_send_async_event 205 if (self._waiting_coro_id is not None) and self.new_requests_num: 206 # throw_coro_service: ThrowCoro = self._loop.get_service_instance(ThrowCoro) 207 # throw_coro_service._add_direct_request(self._waiting_coro_id, WaitingCancelled) 208 kill_coro_service: KillCoro = self._loop.get_service_instance(KillCoro) 209 kill_coro_service._add_direct_request(self._waiting_coro_id) 210 self._waiting_coro_id = None 211 try_send_async_event(self._loop, WAITING_FOR_NEW_REQUESTS_EVENT, None) 212 213 if self.internal_loop_in_yield: 214 if self.pending_requests_num: 215 self.register_response(self._internal_loop_holding_coro.coro_id, None, None) 216 self.internal_loop_in_yield = False 217 218 if self.internal_loop_start_waiters: 219 loop_response = None 220 exception_response = None 221 if self.internal_async_loop is not None: 222 loop_response = self.async_loop = self.internal_async_loop 223 elif self.internal_loop_creation_error is not None: 224 exception_response = self.internal_loop_creation_error 225 226 if loop_response or exception_response: 227 for coro_id in self.internal_loop_start_waiters: 228 self.register_response(coro_id, loop_response, exception_response) 229 230 self.internal_loop_start_waiters = type(self.internal_loop_start_waiters)() 231 232 for coro_id, response in self.results.items(): 233 result, exception = response 234 if coro_id in self.intercommunication_requests_coro_ids: 235 self.intercommunication_requests_coro_ids.remove(coro_id) 236 self._responses.append(DirectResponse(coro_id, type(self), result, exception)) 237 else: 238 self.register_response(coro_id, result, exception) 239 240 self.pending_requests_num -= len(self.results) 241 self.results = type(self.results)() 242 243 if not self.no_idle_calls: 244 self.make_dead() 245 246 def is_low_latency(self) -> bool: 247 return True 248 249 def in_work(self) -> bool: 250 result: bool = bool(self.internal_loop_start_waiters) | bool(self.pending_requests_num) | bool(self.results) 251 return self.thrifty_in_work(result) 252 253 def _on_inherit_surrounding_loop(self) -> Tuple[bool, Optional[AbstractEventLoop], Exception]: 254 exception = None 255 try: 256 self.async_loop = get_running_loop() 257 except: 258 exception = get_exception() 259 260 return True, self.async_loop, exception 261 262 def _on_start_internal_loop(self, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None, debug: bool = False) -> Tuple[bool, Optional[AbstractEventLoop], Exception]: 263 if self.internal_async_loop: 264 self.async_loop = self.internal_async_loop 265 return True, self.async_loop, None 266 267 coro_id = self.current_caller_coro_info.coro_id 268 self.internal_loop_start_waiters.add(coro_id) 269 if self._internal_loop_holding_coro is None: 270 self.internal_async_loop = None 271 self.internal_loop_creation_error = None 272 try: 273 if self._is_asyncio_loop_has_run_once_method(): 274 internal_loop_holding_coro_worker = ExplicitWorker(CoroType.awaitable, _internal_loop_holding_coro_run_once_based) 275 else: 276 internal_loop_holding_coro_worker = ExplicitWorker(CoroType.greenlet, _internal_loop_holding_coro) 277 except ExternalAsyncioLoopAlreadyExistsError as ex: 278 return True, None, ex 279 280 self._internal_loop_holding_coro = self._loop.put_coro(internal_loop_holding_coro_worker, self, main_awaitable, priority, interrupt_when_no_requests, debug) 281 if interrupt_when_no_requests: 282 self._internal_loop_holding_coro.is_background_coro = True 283 284 return False, None, None 285 286 def _is_asyncio_loop_has_run_once_method(self) -> bool: 287 result = False 288 if events._get_running_loop() is not None: 289 raise ExternalAsyncioLoopAlreadyExistsError 290 291 loop = None 292 try: 293 loop = events.new_event_loop() 294 if hasattr(loop, '_run_once'): 295 result = True 296 finally: 297 if loop is not None: 298 loop.close() 299 300 return result 301 302 def _on_ensure_loop(self, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None) -> Tuple[bool, Optional[AbstractEventLoop], Exception]: 303 result_exists, result, exception = self._on_get() 304 if result and (exception is None): 305 return result_exists, result, exception 306 307 result_exists, result, exception = self._on_inherit_surrounding_loop() 308 if result and (exception is None): 309 return result_exists, result, exception 310 311 return self._on_start_internal_loop(main_awaitable, priority, interrupt_when_no_requests) 312 313 def _on_set(self, async_loop): 314 self.async_loop = async_loop 315 return True, None, None 316 317 def _on_get(self): 318 if self.async_loop is None: 319 exception = AsyncioLoopWasNotSetError() 320 else: 321 exception = None 322 323 return True, self.async_loop, exception 324 325 def register_await_response(self, coro_id: CoroID, response: Any, exception: Optional[Exception]): 326 self.results[coro_id] = (response, exception) 327 self.no_idle_calls.discard(coro_id) 328 self.make_live() 329 330 def _on_await(self, awaitable: Awaitable, intercommunication_request: bool = False, prevent_idle: bool = True) -> Tuple[bool, Any, Optional[Exception]]: 331 if self.async_loop is None: 332 return True, None, AsyncioLoopWasNotSetError() 333 334 coro_id = self.current_caller_coro_info.coro_id 335 if intercommunication_request: 336 self.intercommunication_requests_coro_ids.add(coro_id) 337 338 async def awaiting_worker(asyncio_loop_instance: AsyncioLoop, coro_id: CoroID, awaitable: Awaitable): 339 exception = None 340 result = None 341 try: 342 result = await awaitable 343 except: 344 exception = get_exception() 345 346 asyncio_loop_instance.register_await_response(coro_id, result, exception) 347 348 create_task(self.async_loop, awaiting_worker, self, coro_id, awaitable) 349 350 self.pending_requests_num += 1 351 if prevent_idle: 352 self.no_idle_calls.add(coro_id) 353 354 self.make_live() 355 356 return False, None, None 357 358 def _on_create_task(self, awaitable: Awaitable) -> Tuple[bool, Optional[asyncio_Task], Optional[Exception]]: 359 if self.async_loop is None: 360 return True, None, AsyncioLoopWasNotSetError() 361 362 async def awaiting_wrapper(awaitable: Awaitable): 363 return await awaitable 364 365 task: asyncio_Task = create_task(self.async_loop, awaiting_wrapper, awaitable) 366 return True, task, None 367 368 def _on__internal_loop_yield(self) -> Tuple[bool, None, None]: 369 if self.pending_requests_num: 370 return True, None, None 371 else: 372 self.internal_loop_in_yield = True 373 return False, None, None 374 375 def _on__internal_wait_for_new_requests(self) -> Tuple[bool, None, None]: 376 if self.new_requests_num: 377 return True, None, None 378 else: 379 self.waiting_for_new_requests = True 380 return False, None, None 381 382 def register_new_asyncio_request(self) -> None: 383 self.new_requests_num += 1 384 self.make_live() 385 386 def add_on_idle_handler(self) -> None: 387 if not self.use_higher_level_sleep_manager: 388 self.current_on_idle_handler = self._on_system_loop_idle 389 self._loop.on_idle_handlers.add(self._on_system_loop_idle) 390 391 def discard_on_idle_handler(self) -> None: 392 self._loop.on_idle_handlers.discard(self._on_system_loop_idle) 393 self.current_on_idle_handler = None 394 395 def _on__use_higher_level_sleep_manager(self, use_higher_level_sleep_manager: bool) -> Tuple[bool, Optional[None], None]: 396 self.use_higher_level_sleep_manager = use_higher_level_sleep_manager 397 if use_higher_level_sleep_manager: 398 if self.current_on_idle_handler is not None: 399 self._loop.on_idle_handlers.add(self.current_on_idle_handler) 400 else: 401 self.discard_on_idle_handler() 402 403 return True, None, None 404 405 def _on__low_latency_io_mode(self, low_latency_io_mode: bool) -> Tuple[bool, Optional[bool], None]: 406 buff_low_latency_io_mode = self.low_latency_io_mode > 0 407 if low_latency_io_mode: 408 self.low_latency_io_mode += 1 409 else: 410 self.low_latency_io_mode -= 1 411 412 return True, buff_low_latency_io_mode, None 413 414 def _on_turn_on_loops_intercommunication(self, turn_on: bool) -> Tuple[bool, Optional[Callable], None]: 415 result = self._loop.on_wrong_request 416 if turn_on: 417 self._previous_on_wrong_request = self._loop.on_wrong_request 418 self.loops_intercommunication = True 419 self._loop.on_wrong_request = self._on_wrong_request 420 else: 421 if self.loops_intercommunication: 422 self._loop.on_wrong_request = self._previous_on_wrong_request 423 self.loops_intercommunication = False 424 425 self._previous_on_wrong_request = None 426 427 return True, result, None 428 429 def _on_wrong_request(self, coro: CoroWrapperBase, request: Any) -> Request: 430 if request is None: 431 args, kwargs = args_kwargs(AsyncioLoopRequest()._wait_intercommunication(asyncio_coro_sleep_0())) 432 result: Request = Request(coro, type(self), *args, **kwargs) 433 else: 434 args, kwargs = args_kwargs(AsyncioLoopRequest()._wait_intercommunication(asyncio_coro_request(request))) 435 result = Request(coro, type(self), *args, **kwargs) 436 437 return result 438 439 def inline_get(self): 440 if self.async_loop is None: 441 raise AsyncioLoopWasNotSetError 442 else: 443 return self.async_loop 444 445 def inline_set_internal_loop(self, loop, exception: Optional[Exception]): 446 self.internal_async_loop = loop 447 self.internal_loop_creation_error = exception 448 self.make_live() 449 450 def is_need_to_yield_internal_loop(self) -> bool: 451 return not self.pending_requests_num 452 453 454AsyncioLoopRequest.default_service_type = AsyncioLoop 455 456 457def _internal_loop_holding_coro(i: Interface, service: AsyncioLoop, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None, debug: bool = False): 458 ly = None 459 if priority is not None: 460 ly = gly(priority) 461 462 async def main_wrapper(service: AsyncioLoop, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None): 463 loop: AbstractEventLoop = get_event_loop() 464 service.inline_set_internal_loop(loop, None) 465 if interrupt_when_no_requests is None: 466 interrupt_when_no_requests = main_awaitable is None 467 468 def on_loop_simple_yield(): 469 if interrupt_when_no_requests and service.is_need_to_yield_internal_loop(): 470 i(AsyncioLoop, AsyncioLoopRequest()._internal_loop_yield()) 471 else: 472 i(Yield) 473 474 if not service.need_to_stop_internal_loop: 475 if service._idle_for is None: 476 loop.call_soon(on_loop_simple_yield) 477 else: 478 idle_for = service._idle_for 479 service._idle_for = None 480 loop.call_later(idle_for, on_loop_simple_yield) 481 482 def on_loop_loop_yield(): 483 if interrupt_when_no_requests and service.is_need_to_yield_internal_loop(): 484 i(AsyncioLoop, AsyncioLoopRequest()._internal_loop_yield()) 485 else: 486 ly() 487 488 if not service.need_to_stop_internal_loop: 489 if service._idle_for is None: 490 loop.call_soon(on_loop_loop_yield) 491 else: 492 idle_for = service._idle_for 493 service._idle_for = None 494 loop.call_later(idle_for, on_loop_loop_yield) 495 496 if ly is None: 497 loop.call_soon(on_loop_simple_yield) 498 else: 499 loop.call_soon(on_loop_loop_yield) 500 501 if main_awaitable is not None: 502 create_task_awaitable(main_awaitable) 503 504 exception = None 505 try: 506 service.add_on_idle_handler() 507 run_forever(main_wrapper(service, main_awaitable, priority, interrupt_when_no_requests), debug=debug) 508 except: 509 exception = get_exception() 510 # TODO: do something with exception 511 finally: 512 try: 513 service._loop.on_idle_handlers.discard(service._on_system_loop_idle) 514 service.current_on_idle_handler = None 515 except ValueError: 516 pass 517 518 service.inline_set_internal_loop(None, exception) 519 520 521async def _internal_loop_holding_coro_run_once_based(i: Interface, service: AsyncioLoop, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None, debug: bool = False): 522 from cengal.parallel_execution.coroutines.coro_standard_services.async_event_bus import AsyncEventBusRequest, try_send_async_event 523 from .known_asyncio_compatible_loops import prepare_loop, restore_loop 524 525 cs: CoroSchedulerType = current_coro_scheduler() 526 lyps: LoopYieldPriorityScheduler = cs.get_service_instance(LoopYieldPriorityScheduler) 527 umsi: RationalNumber = get_usable_min_sleep_interval() 528 529 ly = None 530 if priority is not None: 531 ly = await agly(priority) 532 533 async def main_wrapper_for_run_once(service: AsyncioLoop, main_awaitable: Optional[Awaitable] = None): 534 loop: AbstractEventLoop = get_event_loop() 535 service.inline_set_internal_loop(loop, None) 536 if main_awaitable is not None: 537 create_task_awaitable(main_awaitable) 538 539 if interrupt_when_no_requests is None: 540 interrupt_when_no_requests = main_awaitable is None 541 542 main_wrapper_for_run_once_coro = main_wrapper_for_run_once(service, main_awaitable) 543 544 exception = None 545 try: 546 if events._get_running_loop() is not None: 547 raise RuntimeError( 548 'run_forever() cannot be called from a running event loop') 549 550 if not coroutines.iscoroutine(main_wrapper_for_run_once_coro): 551 raise ValueError('a coroutine was expected, got {!r}'.format(main_wrapper_for_run_once)) 552 553 loop = events.new_event_loop() 554 service._original_loop_class = prepare_loop(loop) 555 try: 556 events.set_event_loop(loop) 557 loop.set_debug(debug) 558 loop.create_task(main_wrapper_for_run_once_coro) 559 loop._check_closed() 560 loop._check_running() 561 loop._set_coroutine_origin_tracking(loop._debug) 562 loop._thread_id = threading.get_ident() 563 564 old_agen_hooks = sys.get_asyncgen_hooks() 565 sys.set_asyncgen_hooks(firstiter=loop._asyncgen_firstiter_hook, 566 finalizer=loop._asyncgen_finalizer_hook) 567 try: 568 events._set_running_loop(loop) 569 while True: 570 if ly is None: 571 await i(Yield) 572 else: 573 await ly() 574 575 loop.call_soon(lambda: None) 576 # if (not loop._ready) and (not loop._stopping) and (not loop._scheduled): 577 # loop.call_soon(lambda: None) 578 579 loop._run_once() 580 if loop._stopping: 581 break 582 583 if (not loop._ready) and (not loop._stopping): 584 if not loop._scheduled: 585 await i(AsyncioLoop, AsyncioLoopRequest()._internal_loop_yield()) 586 continue 587 else: 588 when = loop._scheduled[0]._when 589 from asyncio.base_events import MAXIMUM_SELECT_TIMEOUT 590 timeout = min(max(0, when - loop.time()), MAXIMUM_SELECT_TIMEOUT) 591 # if get_min_sleep_interval() <= timeout: 592 # usable_min_sleep_interval = get_usable_min_sleep_interval() 593 # if usable_min_sleep_interval > timeout: 594 # timeout = usable_min_sleep_interval 595 596 if (not service.low_latency_io_mode) and (0.001 <= timeout) and (not service.results): 597 service.new_requests_num = 0 598 service.make_dead() 599 async def waiting_coro(i: Interface, timeout: float): 600 from cengal.parallel_execution.coroutines.coro_standard_services.async_event_bus import AsyncEventBusRequest, try_send_async_event 601 try: 602 await i(Sleep, timeout) 603 await i(AsyncEventBusRequest().send_event(WAITING_FOR_NEW_REQUESTS_EVENT, None)) 604 except WaitingCancelled: 605 print(f'AsyncIoLoop {datetime.now().strftime("%H:%M:%S.%f")} >> WaitingCancelled') 606 607 from datetime import datetime 608 try: 609 # TODO: reimplement it in more efficient and elegant way. Btw: Sleep currently will not cancel an event upon coro destroyed 610 lyps_max_delay = lyps.max_delay 611 new_max_timeout = umsi if lyps_max_delay < umsi else lyps_max_delay 612 new_timeout = timeout if new_max_timeout > timeout else new_max_timeout 613 waiting_coro_id = await i(PutCoro, waiting_coro, new_timeout) 614 service._waiting_coro_id = waiting_coro_id 615 # await i(WaitCoro, WaitCoroRequest().single(waiting_coro_id)) 616 await i(AsyncEventBusRequest().wait(WAITING_FOR_NEW_REQUESTS_EVENT)) 617 # print(f'AsyncIoLoop {datetime.now().strftime("%H:%M:%S.%f")} >> WAITING_FOR_NEW_REQUESTS_EVENT') 618 except (WaitingCancelled, CoroutineNotFoundError): 619 print(f'AsyncIoLoop {datetime.now().strftime("%H:%M:%S.%f")} >> WaitingCancelled or CoroutineNotFoundError') 620 pass 621 except: 622 print(f'AsyncIoLoop {datetime.now().strftime("%H:%M:%S.%f")} >> {get_exception()}') 623 raise 624 625 if interrupt_when_no_requests and service.is_need_to_yield_internal_loop(): 626 await i(AsyncioLoop, AsyncioLoopRequest()._internal_loop_yield()) 627 continue 628 finally: 629 loop._stopping = False 630 loop._thread_id = None 631 events._set_running_loop(None) 632 loop._set_coroutine_origin_tracking(False) 633 sys.set_asyncgen_hooks(*old_agen_hooks) 634 finally: 635 restore_loop(loop, service._original_loop_class) 636 try: 637 cancel_all_tasks(loop) 638 loop.run_until_complete(loop.shutdown_asyncgens()) 639 finally: 640 events.set_event_loop(None) 641 loop.close() 642 except: 643 exception = get_exception() 644 finally: 645 service.inline_set_internal_loop(None, exception) 646 647 648def run_in_thread_pool_fast(interface: Interface, function: Callable, *args, **kwargs) -> Any: 649 async def task_wrapper(loop, *args, **kwargs): 650 return await task_in_thread_pool(loop, function, *args, **kwargs) 651 652 return interface(AsyncioLoop, AsyncioLoopRequest().wait(task_wrapper( 653 interface._loop.get_service_instance(AsyncioLoop).inline_get(), 654 *args, **kwargs))) 655 656 657def run_in_thread_pool(function: Callable, *args, **kwargs) -> Any: 658 return run_in_thread_pool_fast(current_interface(), function, *args, **kwargs)
class
AsyncioLoop(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin):
140class AsyncioLoop(Service, EntityStatsMixin): 141 def __init__(self, loop: CoroSchedulerType): 142 super(AsyncioLoop, self).__init__(loop) 143 self.async_loop: Optional[AbstractEventLoop] = None 144 self.internal_async_loop: Optional[AbstractEventLoop] = None 145 self._internal_loop_holding_coro: Optional[CoroWrapperBase] = None 146 self.internal_loop_start_waiters: Set[CoroID] = set() 147 self.need_to_stop_internal_loop: bool = False 148 self.internal_loop_creation_error: Optional[Exception] = None 149 self.internal_loop_in_yield: bool = False 150 self.waiting_for_new_requests: bool = False 151 self.loops_intercommunication: bool = False 152 self._previous_on_wrong_request = None 153 self.intercommunication_requests_coro_ids: Set[CoroID] = set() 154 155 self._request_workers = { 156 0: self._on_inherit_surrounding_loop, 157 1: self._on_start_internal_loop, 158 2: self._on_ensure_loop, 159 3: self._on_set, 160 4: self._on_get, 161 5: self._on_await, 162 6: self._on_create_task, 163 7: self._on__internal_loop_yield, 164 8: self._on_turn_on_loops_intercommunication, 165 9: self._on_await, 166 10: self._on_await, 167 11: self._on__internal_wait_for_new_requests, 168 12: self._on__use_higher_level_sleep_manager, 169 13: self._on__low_latency_io_mode, 170 } 171 172 self.pending_requests_num: int = 0 173 self.new_requests_num: int = 0 174 self.no_idle_calls: Set[CoroID] = set() 175 self.results: Dict[CoroID, Tuple[Any, Exception]] = dict() 176 self._waiting_coro_id: Optional[CoroID] = None 177 self._original_loop_class: Type = None 178 self._idle_for: Optional[RationalNumber] = None # in seconds 179 self.use_higher_level_sleep_manager: bool = False 180 self.current_on_idle_handler: Optional[Callable] = None 181 self.low_latency_io_mode: int = 0 182 183 def destroy(self): 184 if self.internal_async_loop is not None: 185 events._set_running_loop(None) 186 187 def _on_system_loop_idle(self, next_event_after: Optional[RationalNumber]): 188 if next_event_after is None: 189 self._idle_for = max(0.001, get_usable_min_sleep_interval()) 190 else: 191 self._idle_for = max(0.001, next_event_after) 192 193 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 194 return type(self).__name__, { 195 'pending requests num': self.pending_requests_num 196 } 197 198 def single_task_registration_or_immediate_processing( 199 self, request: Optional[AsyncioLoopRequest]=None) -> Tuple[bool, None, None]: 200 if request is not None: 201 return self.resolve_request(request) 202 return True, None, None 203 204 def full_processing_iteration(self): 205 from cengal.parallel_execution.coroutines.coro_standard_services.async_event_bus import AsyncEventBusRequest, try_send_async_event 206 if (self._waiting_coro_id is not None) and self.new_requests_num: 207 # throw_coro_service: ThrowCoro = self._loop.get_service_instance(ThrowCoro) 208 # throw_coro_service._add_direct_request(self._waiting_coro_id, WaitingCancelled) 209 kill_coro_service: KillCoro = self._loop.get_service_instance(KillCoro) 210 kill_coro_service._add_direct_request(self._waiting_coro_id) 211 self._waiting_coro_id = None 212 try_send_async_event(self._loop, WAITING_FOR_NEW_REQUESTS_EVENT, None) 213 214 if self.internal_loop_in_yield: 215 if self.pending_requests_num: 216 self.register_response(self._internal_loop_holding_coro.coro_id, None, None) 217 self.internal_loop_in_yield = False 218 219 if self.internal_loop_start_waiters: 220 loop_response = None 221 exception_response = None 222 if self.internal_async_loop is not None: 223 loop_response = self.async_loop = self.internal_async_loop 224 elif self.internal_loop_creation_error is not None: 225 exception_response = self.internal_loop_creation_error 226 227 if loop_response or exception_response: 228 for coro_id in self.internal_loop_start_waiters: 229 self.register_response(coro_id, loop_response, exception_response) 230 231 self.internal_loop_start_waiters = type(self.internal_loop_start_waiters)() 232 233 for coro_id, response in self.results.items(): 234 result, exception = response 235 if coro_id in self.intercommunication_requests_coro_ids: 236 self.intercommunication_requests_coro_ids.remove(coro_id) 237 self._responses.append(DirectResponse(coro_id, type(self), result, exception)) 238 else: 239 self.register_response(coro_id, result, exception) 240 241 self.pending_requests_num -= len(self.results) 242 self.results = type(self.results)() 243 244 if not self.no_idle_calls: 245 self.make_dead() 246 247 def is_low_latency(self) -> bool: 248 return True 249 250 def in_work(self) -> bool: 251 result: bool = bool(self.internal_loop_start_waiters) | bool(self.pending_requests_num) | bool(self.results) 252 return self.thrifty_in_work(result) 253 254 def _on_inherit_surrounding_loop(self) -> Tuple[bool, Optional[AbstractEventLoop], Exception]: 255 exception = None 256 try: 257 self.async_loop = get_running_loop() 258 except: 259 exception = get_exception() 260 261 return True, self.async_loop, exception 262 263 def _on_start_internal_loop(self, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None, debug: bool = False) -> Tuple[bool, Optional[AbstractEventLoop], Exception]: 264 if self.internal_async_loop: 265 self.async_loop = self.internal_async_loop 266 return True, self.async_loop, None 267 268 coro_id = self.current_caller_coro_info.coro_id 269 self.internal_loop_start_waiters.add(coro_id) 270 if self._internal_loop_holding_coro is None: 271 self.internal_async_loop = None 272 self.internal_loop_creation_error = None 273 try: 274 if self._is_asyncio_loop_has_run_once_method(): 275 internal_loop_holding_coro_worker = ExplicitWorker(CoroType.awaitable, _internal_loop_holding_coro_run_once_based) 276 else: 277 internal_loop_holding_coro_worker = ExplicitWorker(CoroType.greenlet, _internal_loop_holding_coro) 278 except ExternalAsyncioLoopAlreadyExistsError as ex: 279 return True, None, ex 280 281 self._internal_loop_holding_coro = self._loop.put_coro(internal_loop_holding_coro_worker, self, main_awaitable, priority, interrupt_when_no_requests, debug) 282 if interrupt_when_no_requests: 283 self._internal_loop_holding_coro.is_background_coro = True 284 285 return False, None, None 286 287 def _is_asyncio_loop_has_run_once_method(self) -> bool: 288 result = False 289 if events._get_running_loop() is not None: 290 raise ExternalAsyncioLoopAlreadyExistsError 291 292 loop = None 293 try: 294 loop = events.new_event_loop() 295 if hasattr(loop, '_run_once'): 296 result = True 297 finally: 298 if loop is not None: 299 loop.close() 300 301 return result 302 303 def _on_ensure_loop(self, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None) -> Tuple[bool, Optional[AbstractEventLoop], Exception]: 304 result_exists, result, exception = self._on_get() 305 if result and (exception is None): 306 return result_exists, result, exception 307 308 result_exists, result, exception = self._on_inherit_surrounding_loop() 309 if result and (exception is None): 310 return result_exists, result, exception 311 312 return self._on_start_internal_loop(main_awaitable, priority, interrupt_when_no_requests) 313 314 def _on_set(self, async_loop): 315 self.async_loop = async_loop 316 return True, None, None 317 318 def _on_get(self): 319 if self.async_loop is None: 320 exception = AsyncioLoopWasNotSetError() 321 else: 322 exception = None 323 324 return True, self.async_loop, exception 325 326 def register_await_response(self, coro_id: CoroID, response: Any, exception: Optional[Exception]): 327 self.results[coro_id] = (response, exception) 328 self.no_idle_calls.discard(coro_id) 329 self.make_live() 330 331 def _on_await(self, awaitable: Awaitable, intercommunication_request: bool = False, prevent_idle: bool = True) -> Tuple[bool, Any, Optional[Exception]]: 332 if self.async_loop is None: 333 return True, None, AsyncioLoopWasNotSetError() 334 335 coro_id = self.current_caller_coro_info.coro_id 336 if intercommunication_request: 337 self.intercommunication_requests_coro_ids.add(coro_id) 338 339 async def awaiting_worker(asyncio_loop_instance: AsyncioLoop, coro_id: CoroID, awaitable: Awaitable): 340 exception = None 341 result = None 342 try: 343 result = await awaitable 344 except: 345 exception = get_exception() 346 347 asyncio_loop_instance.register_await_response(coro_id, result, exception) 348 349 create_task(self.async_loop, awaiting_worker, self, coro_id, awaitable) 350 351 self.pending_requests_num += 1 352 if prevent_idle: 353 self.no_idle_calls.add(coro_id) 354 355 self.make_live() 356 357 return False, None, None 358 359 def _on_create_task(self, awaitable: Awaitable) -> Tuple[bool, Optional[asyncio_Task], Optional[Exception]]: 360 if self.async_loop is None: 361 return True, None, AsyncioLoopWasNotSetError() 362 363 async def awaiting_wrapper(awaitable: Awaitable): 364 return await awaitable 365 366 task: asyncio_Task = create_task(self.async_loop, awaiting_wrapper, awaitable) 367 return True, task, None 368 369 def _on__internal_loop_yield(self) -> Tuple[bool, None, None]: 370 if self.pending_requests_num: 371 return True, None, None 372 else: 373 self.internal_loop_in_yield = True 374 return False, None, None 375 376 def _on__internal_wait_for_new_requests(self) -> Tuple[bool, None, None]: 377 if self.new_requests_num: 378 return True, None, None 379 else: 380 self.waiting_for_new_requests = True 381 return False, None, None 382 383 def register_new_asyncio_request(self) -> None: 384 self.new_requests_num += 1 385 self.make_live() 386 387 def add_on_idle_handler(self) -> None: 388 if not self.use_higher_level_sleep_manager: 389 self.current_on_idle_handler = self._on_system_loop_idle 390 self._loop.on_idle_handlers.add(self._on_system_loop_idle) 391 392 def discard_on_idle_handler(self) -> None: 393 self._loop.on_idle_handlers.discard(self._on_system_loop_idle) 394 self.current_on_idle_handler = None 395 396 def _on__use_higher_level_sleep_manager(self, use_higher_level_sleep_manager: bool) -> Tuple[bool, Optional[None], None]: 397 self.use_higher_level_sleep_manager = use_higher_level_sleep_manager 398 if use_higher_level_sleep_manager: 399 if self.current_on_idle_handler is not None: 400 self._loop.on_idle_handlers.add(self.current_on_idle_handler) 401 else: 402 self.discard_on_idle_handler() 403 404 return True, None, None 405 406 def _on__low_latency_io_mode(self, low_latency_io_mode: bool) -> Tuple[bool, Optional[bool], None]: 407 buff_low_latency_io_mode = self.low_latency_io_mode > 0 408 if low_latency_io_mode: 409 self.low_latency_io_mode += 1 410 else: 411 self.low_latency_io_mode -= 1 412 413 return True, buff_low_latency_io_mode, None 414 415 def _on_turn_on_loops_intercommunication(self, turn_on: bool) -> Tuple[bool, Optional[Callable], None]: 416 result = self._loop.on_wrong_request 417 if turn_on: 418 self._previous_on_wrong_request = self._loop.on_wrong_request 419 self.loops_intercommunication = True 420 self._loop.on_wrong_request = self._on_wrong_request 421 else: 422 if self.loops_intercommunication: 423 self._loop.on_wrong_request = self._previous_on_wrong_request 424 self.loops_intercommunication = False 425 426 self._previous_on_wrong_request = None 427 428 return True, result, None 429 430 def _on_wrong_request(self, coro: CoroWrapperBase, request: Any) -> Request: 431 if request is None: 432 args, kwargs = args_kwargs(AsyncioLoopRequest()._wait_intercommunication(asyncio_coro_sleep_0())) 433 result: Request = Request(coro, type(self), *args, **kwargs) 434 else: 435 args, kwargs = args_kwargs(AsyncioLoopRequest()._wait_intercommunication(asyncio_coro_request(request))) 436 result = Request(coro, type(self), *args, **kwargs) 437 438 return result 439 440 def inline_get(self): 441 if self.async_loop is None: 442 raise AsyncioLoopWasNotSetError 443 else: 444 return self.async_loop 445 446 def inline_set_internal_loop(self, loop, exception: Optional[Exception]): 447 self.internal_async_loop = loop 448 self.internal_loop_creation_error = exception 449 self.make_live() 450 451 def is_need_to_yield_internal_loop(self) -> bool: 452 return not self.pending_requests_num
AsyncioLoop( loop: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
141 def __init__(self, loop: CoroSchedulerType): 142 super(AsyncioLoop, self).__init__(loop) 143 self.async_loop: Optional[AbstractEventLoop] = None 144 self.internal_async_loop: Optional[AbstractEventLoop] = None 145 self._internal_loop_holding_coro: Optional[CoroWrapperBase] = None 146 self.internal_loop_start_waiters: Set[CoroID] = set() 147 self.need_to_stop_internal_loop: bool = False 148 self.internal_loop_creation_error: Optional[Exception] = None 149 self.internal_loop_in_yield: bool = False 150 self.waiting_for_new_requests: bool = False 151 self.loops_intercommunication: bool = False 152 self._previous_on_wrong_request = None 153 self.intercommunication_requests_coro_ids: Set[CoroID] = set() 154 155 self._request_workers = { 156 0: self._on_inherit_surrounding_loop, 157 1: self._on_start_internal_loop, 158 2: self._on_ensure_loop, 159 3: self._on_set, 160 4: self._on_get, 161 5: self._on_await, 162 6: self._on_create_task, 163 7: self._on__internal_loop_yield, 164 8: self._on_turn_on_loops_intercommunication, 165 9: self._on_await, 166 10: self._on_await, 167 11: self._on__internal_wait_for_new_requests, 168 12: self._on__use_higher_level_sleep_manager, 169 13: self._on__low_latency_io_mode, 170 } 171 172 self.pending_requests_num: int = 0 173 self.new_requests_num: int = 0 174 self.no_idle_calls: Set[CoroID] = set() 175 self.results: Dict[CoroID, Tuple[Any, Exception]] = dict() 176 self._waiting_coro_id: Optional[CoroID] = None 177 self._original_loop_class: Type = None 178 self._idle_for: Optional[RationalNumber] = None # in seconds 179 self.use_higher_level_sleep_manager: bool = False 180 self.current_on_idle_handler: Optional[Callable] = None 181 self.low_latency_io_mode: int = 0
def
get_entity_stats( self, stats_level: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin.StatsLevel = <StatsLevel.debug: 1>) -> Tuple[str, Dict[str, Any]]:
def
single_task_registration_or_immediate_processing( self, request: Union[AsyncioLoopRequest, NoneType] = None) -> Tuple[bool, NoneType, NoneType]:
def
full_processing_iteration(self):
204 def full_processing_iteration(self): 205 from cengal.parallel_execution.coroutines.coro_standard_services.async_event_bus import AsyncEventBusRequest, try_send_async_event 206 if (self._waiting_coro_id is not None) and self.new_requests_num: 207 # throw_coro_service: ThrowCoro = self._loop.get_service_instance(ThrowCoro) 208 # throw_coro_service._add_direct_request(self._waiting_coro_id, WaitingCancelled) 209 kill_coro_service: KillCoro = self._loop.get_service_instance(KillCoro) 210 kill_coro_service._add_direct_request(self._waiting_coro_id) 211 self._waiting_coro_id = None 212 try_send_async_event(self._loop, WAITING_FOR_NEW_REQUESTS_EVENT, None) 213 214 if self.internal_loop_in_yield: 215 if self.pending_requests_num: 216 self.register_response(self._internal_loop_holding_coro.coro_id, None, None) 217 self.internal_loop_in_yield = False 218 219 if self.internal_loop_start_waiters: 220 loop_response = None 221 exception_response = None 222 if self.internal_async_loop is not None: 223 loop_response = self.async_loop = self.internal_async_loop 224 elif self.internal_loop_creation_error is not None: 225 exception_response = self.internal_loop_creation_error 226 227 if loop_response or exception_response: 228 for coro_id in self.internal_loop_start_waiters: 229 self.register_response(coro_id, loop_response, exception_response) 230 231 self.internal_loop_start_waiters = type(self.internal_loop_start_waiters)() 232 233 for coro_id, response in self.results.items(): 234 result, exception = response 235 if coro_id in self.intercommunication_requests_coro_ids: 236 self.intercommunication_requests_coro_ids.remove(coro_id) 237 self._responses.append(DirectResponse(coro_id, type(self), result, exception)) 238 else: 239 self.register_response(coro_id, result, exception) 240 241 self.pending_requests_num -= len(self.results) 242 self.results = type(self.results)() 243 244 if not self.no_idle_calls: 245 self.make_dead()
def
in_work(self) -> bool:
250 def in_work(self) -> bool: 251 result: bool = bool(self.internal_loop_start_waiters) | bool(self.pending_requests_num) | bool(self.results) 252 return self.thrifty_in_work(result)
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
def
register_await_response( self, coro_id: int, response: Any, exception: Union[Exception, NoneType]):
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- time_left_before_next_event
- make_live
- make_dead
- service_id_impl
- service_id
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
- StatsLevel
class
AsyncioLoopRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
85class AsyncioLoopRequest(ServiceRequest): 86 def inherit_surrounding_loop(self) -> AbstractEventLoop: 87 return self._save(0) 88 89 def start_internal_loop(self, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None, debug: bool = False) -> AbstractEventLoop: 90 return self._save(1, main_awaitable, priority, interrupt_when_no_requests, debug) 91 92 def ensure_loop(self, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None) -> AbstractEventLoop: 93 return self._save(2, main_awaitable, priority, interrupt_when_no_requests) 94 95 def set(self, async_loop) -> None: 96 return self._save(3, async_loop) 97 98 def get(self) -> AbstractEventLoop: 99 return self._save(4) 100 101 def wait(self, awaitable: Awaitable) -> Any: 102 return self._save(5, awaitable, False, True) 103 104 def create_task(self, awaitable: Awaitable) -> asyncio_Task: 105 return self._save(6, awaitable) 106 107 def _internal_loop_yield(self) -> None: 108 return self._save(7) 109 110 def turn_on_loops_intercommunication(self, turn_on: bool = True) -> Optional[Callable]: 111 return self._save(8, turn_on) 112 113 def _wait_intercommunication(self, awaitable: Awaitable) -> Any: 114 return self._save(9, awaitable, True, True) 115 116 def wait_idle(self, awaitable: Awaitable) -> Any: 117 return self._save(10, awaitable, False, False) 118 119 def use_higher_level_sleep_manager(self, use_higher_level_sleep_manager: bool = True) -> Any: 120 return self._save(12, use_higher_level_sleep_manager) 121 122 def turn_on_low_latency_io_mode(self, low_latency_io_mode: bool = True) -> Any: 123 return self._save(13, low_latency_io_mode)
def
start_internal_loop( self, main_awaitable: Union[Awaitable, NoneType] = None, priority: Union[cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield.CoroPriority, NoneType] = None, interrupt_when_no_requests: Union[bool, NoneType] = None, debug: bool = False) -> asyncio.events.AbstractEventLoop:
def
ensure_loop( self, main_awaitable: Union[Awaitable, NoneType] = None, priority: Union[cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield.CoroPriority, NoneType] = None, interrupt_when_no_requests: Union[bool, NoneType] = None) -> asyncio.events.AbstractEventLoop:
default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] =
<class 'AsyncioLoop'>
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
- default__request__type__
- request_type
- args
- kwargs
- provide_to_request_handler
- interface
- i
- async_interface
- ai
class
AsyncioLoopWasNotSetError(builtins.Exception):
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
def
run_in_thread_pool(function: Callable, *args, **kwargs) -> Any:
def
run_in_thread_pool_fast( interface: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, function: Callable, *args, **kwargs) -> Any:
649def run_in_thread_pool_fast(interface: Interface, function: Callable, *args, **kwargs) -> Any: 650 async def task_wrapper(loop, *args, **kwargs): 651 return await task_in_thread_pool(loop, function, *args, **kwargs) 652 653 return interface(AsyncioLoop, AsyncioLoopRequest().wait(task_wrapper( 654 interface._loop.get_service_instance(AsyncioLoop).inline_get(), 655 *args, **kwargs)))