cengal.parallel_execution.coroutines.coro_standard_services.asyncio_loop.versions.v_0.asyncio_loop

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19__all__ = ['AsyncioLoop', 'AsyncioLoopRequest', 'AsyncioLoopWasNotSetError', 'run_in_thread_pool', 'run_in_thread_pool_fast']
 20
 21from cengal.parallel_execution.coroutines.coro_scheduler import *
 22from cengal.parallel_execution.coroutines.coro_tools.await_coro import create_task
 23from cengal.parallel_execution.coroutines.coro_tools.await_coro import task_in_thread_pool
 24from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import gly, agly, CoroPriority, LoopYieldPriorityScheduler
 25from cengal.parallel_execution.coroutines.coro_standard_services.simple_yield import Yield
 26from cengal.parallel_execution.coroutines.coro_standard_services.sleep import Sleep
 27from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro
 28from cengal.parallel_execution.coroutines.coro_standard_services.throw_coro import ThrowCoro
 29from cengal.parallel_execution.coroutines.coro_standard_services.kill_coro import KillCoro
 30from cengal.parallel_execution.coroutines.coro_standard_services.wait_coro import WaitCoro, WaitCoroRequest, CoroutineNotFoundError
 31from cengal.parallel_execution.asyncio.run_loop import run_forever, cancel_all_tasks
 32from cengal.parallel_execution.asyncio.atasks import create_task_awaitable
 33from cengal.file_system.file_manager import path_relative_to_current_dir
 34from cengal.time_management.cpu_clock_cycles import perf_counter
 35from cengal.time_management.sleep_tools import get_usable_min_sleep_interval, get_min_sleep_interval
 36from cengal.data_manipulation.serialization import *
 37from cengal.introspection.inspect import get_exception
 38from cengal.math.numbers import RationalNumber
 39from typing import Callable, Tuple, List, Any, Dict, Awaitable, Type
 40import sys
 41import os
 42from asyncio import AbstractEventLoop, get_event_loop, get_running_loop, Task as asyncio_Task, sleep as asyncio_sleep
 43from asyncio.coroutines import _is_coroutine
 44from cengal.code_flow_control.args_manager import args_kwargs
 45from types import coroutine as types_coroutine
 46from asyncio import coroutines
 47from asyncio import events
 48from asyncio import tasks
 49import threading
 50
 51
 52"""
 53Module Docstring
 54Docstrings: http://www.python.org/dev/peps/pep-0257/
 55"""
 56
 57__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 58__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 59__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 60__license__ = "Apache License, Version 2.0"
 61__version__ = "4.4.1"
 62__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 63__email__ = "gtalk@butenkoms.space"
 64# __status__ = "Prototype"
 65__status__ = "Development"
 66# __status__ = "Production"
 67
 68
 69class AsyncioLoopWasNotSetError(Exception):
 70    pass
 71
 72
 73class ExternalAsyncioLoopAlreadyExistsError(Exception):
 74    pass
 75
 76
 77class WaitingCancelled(Exception):
 78    pass
 79
 80
 81WAITING_FOR_NEW_REQUESTS_EVENT = 'AsyncioLoopRequest - WaitingForNewRequestsEvent'
 82
 83
 84class AsyncioLoopRequest(ServiceRequest):
 85    def inherit_surrounding_loop(self) -> AbstractEventLoop:
 86        return self._save(0)
 87
 88    def start_internal_loop(self, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None, debug: bool = False) -> AbstractEventLoop:
 89        return self._save(1, main_awaitable, priority, interrupt_when_no_requests, debug)
 90
 91    def ensure_loop(self, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None) -> AbstractEventLoop:
 92        return self._save(2, main_awaitable, priority, interrupt_when_no_requests)
 93
 94    def set(self, async_loop) -> None:
 95        return self._save(3, async_loop)
 96
 97    def get(self) -> AbstractEventLoop:
 98        return self._save(4)
 99
100    def wait(self, awaitable: Awaitable) -> Any:
101        return self._save(5, awaitable, False, True)
102
103    def create_task(self, awaitable: Awaitable) -> asyncio_Task:
104        return self._save(6, awaitable)
105
106    def _internal_loop_yield(self) -> None:
107        return self._save(7)
108
109    def turn_on_loops_intercommunication(self, turn_on: bool = True) -> Optional[Callable]:
110        return self._save(8, turn_on)
111
112    def _wait_intercommunication(self, awaitable: Awaitable) -> Any:
113        return self._save(9, awaitable, True, True)
114
115    def wait_idle(self, awaitable: Awaitable) -> Any:
116        return self._save(10, awaitable, False, False)
117
118    def use_higher_level_sleep_manager(self, use_higher_level_sleep_manager: bool = True) -> Any:
119        return self._save(12, use_higher_level_sleep_manager)
120
121    def turn_on_low_latency_io_mode(self, low_latency_io_mode: bool = True) -> Any:
122        return self._save(13, low_latency_io_mode)
123
124
125async def asyncio_coro_sleep_0():
126    return await asyncio_sleep(0)
127
128
129def request_giver(request):
130    yield request
131
132@types_coroutine
133def asyncio_coro_request(request):
134    return (yield from request_giver(request))
135
136asyncio_coro_request._is_coroutine = _is_coroutine
137
138
139class AsyncioLoop(Service, EntityStatsMixin):
140    def __init__(self, loop: CoroSchedulerType):
141        super(AsyncioLoop, self).__init__(loop)
142        self.async_loop: Optional[AbstractEventLoop] = None
143        self.internal_async_loop: Optional[AbstractEventLoop] = None
144        self._internal_loop_holding_coro: Optional[CoroWrapperBase] = None
145        self.internal_loop_start_waiters: Set[CoroID] = set()
146        self.need_to_stop_internal_loop: bool = False
147        self.internal_loop_creation_error: Optional[Exception] = None
148        self.internal_loop_in_yield: bool = False
149        self.waiting_for_new_requests: bool = False
150        self.loops_intercommunication: bool = False
151        self._previous_on_wrong_request = None
152        self.intercommunication_requests_coro_ids: Set[CoroID] = set()
153
154        self._request_workers = {
155            0: self._on_inherit_surrounding_loop,
156            1: self._on_start_internal_loop,
157            2: self._on_ensure_loop,
158            3: self._on_set,
159            4: self._on_get,
160            5: self._on_await,
161            6: self._on_create_task,
162            7: self._on__internal_loop_yield,
163            8: self._on_turn_on_loops_intercommunication,
164            9: self._on_await,
165            10: self._on_await,
166            11: self._on__internal_wait_for_new_requests, 
167            12: self._on__use_higher_level_sleep_manager, 
168            13: self._on__low_latency_io_mode, 
169        }
170        
171        self.pending_requests_num: int = 0
172        self.new_requests_num: int = 0
173        self.no_idle_calls: Set[CoroID] = set()
174        self.results: Dict[CoroID, Tuple[Any, Exception]] = dict()
175        self._waiting_coro_id: Optional[CoroID] = None
176        self._original_loop_class: Type = None
177        self._idle_for: Optional[RationalNumber] = None  # in seconds
178        self.use_higher_level_sleep_manager: bool = False
179        self.current_on_idle_handler: Optional[Callable] = None
180        self.low_latency_io_mode: int = 0
181    
182    def destroy(self):
183        if self.internal_async_loop is not None:
184            events._set_running_loop(None)
185
186    def _on_system_loop_idle(self, next_event_after: Optional[RationalNumber]):
187        if next_event_after is None:
188            self._idle_for = max(0.001, get_usable_min_sleep_interval())
189        else:
190            self._idle_for = max(0.001, next_event_after)
191
192    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
193        return type(self).__name__, {
194            'pending requests num': self.pending_requests_num
195        }
196
197    def single_task_registration_or_immediate_processing(
198            self, request: Optional[AsyncioLoopRequest]=None) -> Tuple[bool, None, None]:
199        if request is not None:
200            return self.resolve_request(request)
201        return True, None, None
202
203    def full_processing_iteration(self):
204        from cengal.parallel_execution.coroutines.coro_standard_services.async_event_bus import AsyncEventBusRequest, try_send_async_event
205        if (self._waiting_coro_id is not None) and self.new_requests_num:
206            # throw_coro_service: ThrowCoro = self._loop.get_service_instance(ThrowCoro)
207            # throw_coro_service._add_direct_request(self._waiting_coro_id, WaitingCancelled)
208            kill_coro_service: KillCoro = self._loop.get_service_instance(KillCoro)
209            kill_coro_service._add_direct_request(self._waiting_coro_id)
210            self._waiting_coro_id = None
211            try_send_async_event(self._loop, WAITING_FOR_NEW_REQUESTS_EVENT, None)
212
213        if self.internal_loop_in_yield:
214            if self.pending_requests_num:
215                self.register_response(self._internal_loop_holding_coro.coro_id, None, None)
216                self.internal_loop_in_yield = False
217
218        if self.internal_loop_start_waiters:
219            loop_response = None
220            exception_response = None
221            if self.internal_async_loop is not None:
222                loop_response = self.async_loop = self.internal_async_loop
223            elif self.internal_loop_creation_error is not None:
224                exception_response = self.internal_loop_creation_error
225            
226            if loop_response or exception_response:
227                for coro_id in self.internal_loop_start_waiters:
228                    self.register_response(coro_id, loop_response, exception_response)
229
230                self.internal_loop_start_waiters = type(self.internal_loop_start_waiters)()
231
232        for coro_id, response in self.results.items():
233            result, exception = response
234            if coro_id in self.intercommunication_requests_coro_ids:
235                self.intercommunication_requests_coro_ids.remove(coro_id)
236                self._responses.append(DirectResponse(coro_id, type(self), result, exception))
237            else:
238                self.register_response(coro_id, result, exception)
239        
240        self.pending_requests_num -= len(self.results)
241        self.results = type(self.results)()
242        
243        if not self.no_idle_calls:
244            self.make_dead()
245    
246    def is_low_latency(self) -> bool:
247        return True
248
249    def in_work(self) -> bool:
250        result: bool = bool(self.internal_loop_start_waiters) | bool(self.pending_requests_num) | bool(self.results)
251        return self.thrifty_in_work(result)
252    
253    def _on_inherit_surrounding_loop(self) -> Tuple[bool, Optional[AbstractEventLoop], Exception]:
254        exception = None
255        try:
256            self.async_loop = get_running_loop()
257        except:
258            exception = get_exception()
259
260        return True, self.async_loop, exception
261    
262    def _on_start_internal_loop(self, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None, debug: bool = False) -> Tuple[bool, Optional[AbstractEventLoop], Exception]:
263        if self.internal_async_loop:
264            self.async_loop = self.internal_async_loop
265            return True, self.async_loop, None
266        
267        coro_id = self.current_caller_coro_info.coro_id
268        self.internal_loop_start_waiters.add(coro_id)
269        if self._internal_loop_holding_coro is None:
270            self.internal_async_loop = None
271            self.internal_loop_creation_error = None
272            try:
273                if self._is_asyncio_loop_has_run_once_method():
274                    internal_loop_holding_coro_worker = ExplicitWorker(CoroType.awaitable, _internal_loop_holding_coro_run_once_based)
275                else:
276                    internal_loop_holding_coro_worker = ExplicitWorker(CoroType.greenlet, _internal_loop_holding_coro)
277            except ExternalAsyncioLoopAlreadyExistsError as ex:
278                return True, None, ex
279            
280            self._internal_loop_holding_coro = self._loop.put_coro(internal_loop_holding_coro_worker, self, main_awaitable, priority, interrupt_when_no_requests, debug)
281            if interrupt_when_no_requests:
282                self._internal_loop_holding_coro.is_background_coro = True
283
284        return False, None, None
285    
286    def _is_asyncio_loop_has_run_once_method(self) -> bool:
287        result = False
288        if events._get_running_loop() is not None:
289            raise ExternalAsyncioLoopAlreadyExistsError
290
291        loop = None
292        try:
293            loop = events.new_event_loop()
294            if hasattr(loop, '_run_once'):
295                result = True
296        finally:
297            if loop is not None:
298                loop.close()
299        
300        return result
301    
302    def _on_ensure_loop(self, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None) -> Tuple[bool, Optional[AbstractEventLoop], Exception]:
303        result_exists, result, exception = self._on_get()
304        if result and (exception is None):
305            return result_exists, result, exception
306
307        result_exists, result, exception = self._on_inherit_surrounding_loop()
308        if result and (exception is None):
309            return result_exists, result, exception
310        
311        return self._on_start_internal_loop(main_awaitable, priority, interrupt_when_no_requests)
312
313    def _on_set(self, async_loop):
314        self.async_loop = async_loop
315        return True, None, None
316
317    def _on_get(self):
318        if self.async_loop is None:
319            exception = AsyncioLoopWasNotSetError()
320        else:
321            exception = None
322        
323        return True, self.async_loop, exception
324    
325    def register_await_response(self, coro_id: CoroID, response: Any, exception: Optional[Exception]):
326        self.results[coro_id] = (response, exception)
327        self.no_idle_calls.discard(coro_id)
328        self.make_live()
329
330    def _on_await(self, awaitable: Awaitable, intercommunication_request: bool = False, prevent_idle: bool = True) -> Tuple[bool, Any, Optional[Exception]]:
331        if self.async_loop is None:
332            return True, None, AsyncioLoopWasNotSetError()
333        
334        coro_id = self.current_caller_coro_info.coro_id
335        if intercommunication_request:
336            self.intercommunication_requests_coro_ids.add(coro_id)
337        
338        async def awaiting_worker(asyncio_loop_instance: AsyncioLoop, coro_id: CoroID, awaitable: Awaitable):
339            exception = None
340            result = None
341            try:
342                result = await awaitable
343            except:
344                exception = get_exception()
345            
346            asyncio_loop_instance.register_await_response(coro_id, result, exception)
347        
348        create_task(self.async_loop, awaiting_worker, self, coro_id, awaitable)
349        
350        self.pending_requests_num += 1
351        if prevent_idle:
352            self.no_idle_calls.add(coro_id)
353        
354        self.make_live()
355        
356        return False, None, None
357
358    def _on_create_task(self, awaitable: Awaitable) -> Tuple[bool, Optional[asyncio_Task], Optional[Exception]]:
359        if self.async_loop is None:
360            return True, None, AsyncioLoopWasNotSetError()
361
362        async def awaiting_wrapper(awaitable: Awaitable):
363            return await awaitable
364
365        task: asyncio_Task = create_task(self.async_loop, awaiting_wrapper, awaitable)
366        return True, task, None
367    
368    def _on__internal_loop_yield(self) -> Tuple[bool, None, None]:
369        if self.pending_requests_num:
370            return True, None, None
371        else:
372            self.internal_loop_in_yield = True
373            return False, None, None
374    
375    def _on__internal_wait_for_new_requests(self) -> Tuple[bool, None, None]:
376        if self.new_requests_num:
377            return True, None, None
378        else:
379            self.waiting_for_new_requests = True
380            return False, None, None
381    
382    def register_new_asyncio_request(self) -> None:
383        self.new_requests_num += 1
384        self.make_live()
385    
386    def add_on_idle_handler(self) -> None:
387        if not self.use_higher_level_sleep_manager:
388            self.current_on_idle_handler = self._on_system_loop_idle
389            self._loop.on_idle_handlers.add(self._on_system_loop_idle)
390    
391    def discard_on_idle_handler(self) -> None:
392        self._loop.on_idle_handlers.discard(self._on_system_loop_idle)
393        self.current_on_idle_handler = None
394    
395    def _on__use_higher_level_sleep_manager(self, use_higher_level_sleep_manager: bool) -> Tuple[bool, Optional[None], None]:
396        self.use_higher_level_sleep_manager = use_higher_level_sleep_manager
397        if use_higher_level_sleep_manager:
398            if self.current_on_idle_handler is not None:
399                self._loop.on_idle_handlers.add(self.current_on_idle_handler)
400        else:
401            self.discard_on_idle_handler()
402        
403        return True, None, None
404
405    def _on__low_latency_io_mode(self, low_latency_io_mode: bool) -> Tuple[bool, Optional[bool], None]:
406        buff_low_latency_io_mode = self.low_latency_io_mode > 0
407        if low_latency_io_mode:
408            self.low_latency_io_mode += 1
409        else:
410            self.low_latency_io_mode -= 1
411        
412        return True, buff_low_latency_io_mode, None
413    
414    def _on_turn_on_loops_intercommunication(self, turn_on: bool) -> Tuple[bool, Optional[Callable], None]:
415        result = self._loop.on_wrong_request
416        if turn_on:
417            self._previous_on_wrong_request = self._loop.on_wrong_request
418            self.loops_intercommunication = True
419            self._loop.on_wrong_request = self._on_wrong_request
420        else:
421            if self.loops_intercommunication:
422                self._loop.on_wrong_request = self._previous_on_wrong_request
423                self.loops_intercommunication = False
424            
425            self._previous_on_wrong_request = None
426        
427        return True, result, None
428    
429    def _on_wrong_request(self, coro: CoroWrapperBase, request: Any) -> Request:
430        if request is None:
431            args, kwargs = args_kwargs(AsyncioLoopRequest()._wait_intercommunication(asyncio_coro_sleep_0()))
432            result: Request = Request(coro, type(self), *args, **kwargs)
433        else:
434            args, kwargs = args_kwargs(AsyncioLoopRequest()._wait_intercommunication(asyncio_coro_request(request)))
435            result = Request(coro, type(self), *args, **kwargs)
436
437        return result
438    
439    def inline_get(self):
440        if self.async_loop is None:
441            raise AsyncioLoopWasNotSetError
442        else:
443            return self.async_loop
444    
445    def inline_set_internal_loop(self, loop, exception: Optional[Exception]):
446        self.internal_async_loop = loop
447        self.internal_loop_creation_error = exception
448        self.make_live()
449    
450    def is_need_to_yield_internal_loop(self) -> bool:
451        return not self.pending_requests_num
452
453
454AsyncioLoopRequest.default_service_type = AsyncioLoop
455
456
457def _internal_loop_holding_coro(i: Interface, service: AsyncioLoop, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None, debug: bool = False):
458    ly = None
459    if priority is not None:
460        ly = gly(priority)
461
462    async def main_wrapper(service: AsyncioLoop, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None):
463        loop: AbstractEventLoop = get_event_loop()
464        service.inline_set_internal_loop(loop, None)
465        if interrupt_when_no_requests is None:
466            interrupt_when_no_requests = main_awaitable is None
467        
468        def on_loop_simple_yield():
469            if interrupt_when_no_requests and service.is_need_to_yield_internal_loop():
470                i(AsyncioLoop, AsyncioLoopRequest()._internal_loop_yield())
471            else:
472                i(Yield)
473            
474            if not service.need_to_stop_internal_loop:
475                if service._idle_for is None:
476                    loop.call_soon(on_loop_simple_yield)
477                else:
478                    idle_for = service._idle_for
479                    service._idle_for = None
480                    loop.call_later(idle_for, on_loop_simple_yield)
481        
482        def on_loop_loop_yield():
483            if interrupt_when_no_requests and service.is_need_to_yield_internal_loop():
484                i(AsyncioLoop, AsyncioLoopRequest()._internal_loop_yield())
485            else:
486                ly()
487            
488            if not service.need_to_stop_internal_loop:
489                if service._idle_for is None:
490                    loop.call_soon(on_loop_loop_yield)
491                else:
492                    idle_for = service._idle_for
493                    service._idle_for = None
494                    loop.call_later(idle_for, on_loop_loop_yield)
495        
496        if ly is None:
497            loop.call_soon(on_loop_simple_yield)
498        else:
499            loop.call_soon(on_loop_loop_yield)
500
501        if main_awaitable is not None:
502            create_task_awaitable(main_awaitable)
503    
504    exception = None
505    try:
506        service.add_on_idle_handler()
507        run_forever(main_wrapper(service, main_awaitable, priority, interrupt_when_no_requests), debug=debug)
508    except:
509        exception = get_exception()
510        # TODO: do something with exception
511    finally:
512        try:
513            service._loop.on_idle_handlers.discard(service._on_system_loop_idle)
514            service.current_on_idle_handler = None
515        except ValueError:
516            pass
517
518        service.inline_set_internal_loop(None, exception)
519
520
521async def _internal_loop_holding_coro_run_once_based(i: Interface, service: AsyncioLoop, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None, debug: bool = False):
522    from cengal.parallel_execution.coroutines.coro_standard_services.async_event_bus import AsyncEventBusRequest, try_send_async_event
523    from .known_asyncio_compatible_loops import prepare_loop, restore_loop
524
525    cs: CoroSchedulerType = current_coro_scheduler()
526    lyps: LoopYieldPriorityScheduler = cs.get_service_instance(LoopYieldPriorityScheduler)
527    umsi: RationalNumber = get_usable_min_sleep_interval()
528
529    ly = None
530    if priority is not None:
531        ly = await agly(priority)
532
533    async def main_wrapper_for_run_once(service: AsyncioLoop, main_awaitable: Optional[Awaitable] = None):
534        loop: AbstractEventLoop = get_event_loop()
535        service.inline_set_internal_loop(loop, None)
536        if main_awaitable is not None:
537            create_task_awaitable(main_awaitable)
538    
539    if interrupt_when_no_requests is None:
540        interrupt_when_no_requests = main_awaitable is None
541    
542    main_wrapper_for_run_once_coro = main_wrapper_for_run_once(service, main_awaitable)
543    
544    exception = None
545    try:
546        if events._get_running_loop() is not None:
547            raise RuntimeError(
548                'run_forever() cannot be called from a running event loop')
549
550        if not coroutines.iscoroutine(main_wrapper_for_run_once_coro):
551            raise ValueError('a coroutine was expected, got {!r}'.format(main_wrapper_for_run_once))
552
553        loop = events.new_event_loop()
554        service._original_loop_class = prepare_loop(loop)
555        try:
556            events.set_event_loop(loop)
557            loop.set_debug(debug)
558            loop.create_task(main_wrapper_for_run_once_coro)
559            loop._check_closed()
560            loop._check_running()
561            loop._set_coroutine_origin_tracking(loop._debug)
562            loop._thread_id = threading.get_ident()
563
564            old_agen_hooks = sys.get_asyncgen_hooks()
565            sys.set_asyncgen_hooks(firstiter=loop._asyncgen_firstiter_hook,
566                                finalizer=loop._asyncgen_finalizer_hook)
567            try:
568                events._set_running_loop(loop)
569                while True:
570                    if ly is None:
571                        await i(Yield)
572                    else:
573                        await ly()
574
575                    loop.call_soon(lambda: None)
576                    # if (not loop._ready) and (not loop._stopping) and (not loop._scheduled):
577                    #     loop.call_soon(lambda: None)
578                    
579                    loop._run_once()
580                    if loop._stopping:
581                        break
582
583                    if (not loop._ready) and (not loop._stopping):
584                        if not loop._scheduled:
585                            await i(AsyncioLoop, AsyncioLoopRequest()._internal_loop_yield())
586                            continue
587                        else:
588                            when = loop._scheduled[0]._when
589                            from asyncio.base_events import MAXIMUM_SELECT_TIMEOUT
590                            timeout = min(max(0, when - loop.time()), MAXIMUM_SELECT_TIMEOUT)
591                            # if get_min_sleep_interval() <= timeout:
592                            #     usable_min_sleep_interval = get_usable_min_sleep_interval()
593                            #     if usable_min_sleep_interval > timeout:
594                            #         timeout = usable_min_sleep_interval
595
596                            if (not service.low_latency_io_mode) and (0.001 <= timeout) and (not service.results):
597                                service.new_requests_num = 0
598                                service.make_dead()
599                                async def waiting_coro(i: Interface, timeout: float):
600                                    from cengal.parallel_execution.coroutines.coro_standard_services.async_event_bus import AsyncEventBusRequest, try_send_async_event
601                                    try:
602                                        await i(Sleep, timeout)
603                                        await i(AsyncEventBusRequest().send_event(WAITING_FOR_NEW_REQUESTS_EVENT, None))
604                                    except WaitingCancelled:
605                                        print(f'AsyncIoLoop {datetime.now().strftime("%H:%M:%S.%f")} >> WaitingCancelled')
606                                
607                                from datetime import datetime
608                                try:
609                                    # TODO: reimplement it in more efficient and elegant way. Btw: Sleep currently will not cancel an event upon coro destroyed
610                                    lyps_max_delay = lyps.max_delay
611                                    new_max_timeout = umsi if lyps_max_delay < umsi else lyps_max_delay
612                                    new_timeout = timeout if new_max_timeout > timeout else new_max_timeout
613                                    waiting_coro_id = await i(PutCoro, waiting_coro, new_timeout)
614                                    service._waiting_coro_id = waiting_coro_id
615                                    # await i(WaitCoro, WaitCoroRequest().single(waiting_coro_id))
616                                    await i(AsyncEventBusRequest().wait(WAITING_FOR_NEW_REQUESTS_EVENT))
617                                    # print(f'AsyncIoLoop {datetime.now().strftime("%H:%M:%S.%f")} >> WAITING_FOR_NEW_REQUESTS_EVENT')
618                                except (WaitingCancelled, CoroutineNotFoundError):
619                                    print(f'AsyncIoLoop {datetime.now().strftime("%H:%M:%S.%f")} >> WaitingCancelled or CoroutineNotFoundError')
620                                    pass
621                                except:
622                                    print(f'AsyncIoLoop {datetime.now().strftime("%H:%M:%S.%f")} >> {get_exception()}')
623                                    raise
624
625                    if interrupt_when_no_requests and service.is_need_to_yield_internal_loop():
626                        await i(AsyncioLoop, AsyncioLoopRequest()._internal_loop_yield())
627                        continue
628            finally:
629                loop._stopping = False
630                loop._thread_id = None
631                events._set_running_loop(None)
632                loop._set_coroutine_origin_tracking(False)
633                sys.set_asyncgen_hooks(*old_agen_hooks)
634        finally:
635            restore_loop(loop, service._original_loop_class)
636            try:
637                cancel_all_tasks(loop)
638                loop.run_until_complete(loop.shutdown_asyncgens())
639            finally:
640                events.set_event_loop(None)
641                loop.close()
642    except:
643        exception = get_exception()
644    finally:
645        service.inline_set_internal_loop(None, exception)
646
647
648def run_in_thread_pool_fast(interface: Interface, function: Callable, *args, **kwargs) -> Any:
649    async def task_wrapper(loop, *args, **kwargs):
650        return await task_in_thread_pool(loop, function, *args, **kwargs)
651
652    return interface(AsyncioLoop, AsyncioLoopRequest().wait(task_wrapper(
653        interface._loop.get_service_instance(AsyncioLoop).inline_get(), 
654        *args, **kwargs)))
655
656
657def run_in_thread_pool(function: Callable, *args, **kwargs) -> Any:
658    return run_in_thread_pool_fast(current_interface(), function, *args, **kwargs)
class AsyncioLoop(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin):
140class AsyncioLoop(Service, EntityStatsMixin):
141    def __init__(self, loop: CoroSchedulerType):
142        super(AsyncioLoop, self).__init__(loop)
143        self.async_loop: Optional[AbstractEventLoop] = None
144        self.internal_async_loop: Optional[AbstractEventLoop] = None
145        self._internal_loop_holding_coro: Optional[CoroWrapperBase] = None
146        self.internal_loop_start_waiters: Set[CoroID] = set()
147        self.need_to_stop_internal_loop: bool = False
148        self.internal_loop_creation_error: Optional[Exception] = None
149        self.internal_loop_in_yield: bool = False
150        self.waiting_for_new_requests: bool = False
151        self.loops_intercommunication: bool = False
152        self._previous_on_wrong_request = None
153        self.intercommunication_requests_coro_ids: Set[CoroID] = set()
154
155        self._request_workers = {
156            0: self._on_inherit_surrounding_loop,
157            1: self._on_start_internal_loop,
158            2: self._on_ensure_loop,
159            3: self._on_set,
160            4: self._on_get,
161            5: self._on_await,
162            6: self._on_create_task,
163            7: self._on__internal_loop_yield,
164            8: self._on_turn_on_loops_intercommunication,
165            9: self._on_await,
166            10: self._on_await,
167            11: self._on__internal_wait_for_new_requests, 
168            12: self._on__use_higher_level_sleep_manager, 
169            13: self._on__low_latency_io_mode, 
170        }
171        
172        self.pending_requests_num: int = 0
173        self.new_requests_num: int = 0
174        self.no_idle_calls: Set[CoroID] = set()
175        self.results: Dict[CoroID, Tuple[Any, Exception]] = dict()
176        self._waiting_coro_id: Optional[CoroID] = None
177        self._original_loop_class: Type = None
178        self._idle_for: Optional[RationalNumber] = None  # in seconds
179        self.use_higher_level_sleep_manager: bool = False
180        self.current_on_idle_handler: Optional[Callable] = None
181        self.low_latency_io_mode: int = 0
182    
183    def destroy(self):
184        if self.internal_async_loop is not None:
185            events._set_running_loop(None)
186
187    def _on_system_loop_idle(self, next_event_after: Optional[RationalNumber]):
188        if next_event_after is None:
189            self._idle_for = max(0.001, get_usable_min_sleep_interval())
190        else:
191            self._idle_for = max(0.001, next_event_after)
192
193    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
194        return type(self).__name__, {
195            'pending requests num': self.pending_requests_num
196        }
197
198    def single_task_registration_or_immediate_processing(
199            self, request: Optional[AsyncioLoopRequest]=None) -> Tuple[bool, None, None]:
200        if request is not None:
201            return self.resolve_request(request)
202        return True, None, None
203
204    def full_processing_iteration(self):
205        from cengal.parallel_execution.coroutines.coro_standard_services.async_event_bus import AsyncEventBusRequest, try_send_async_event
206        if (self._waiting_coro_id is not None) and self.new_requests_num:
207            # throw_coro_service: ThrowCoro = self._loop.get_service_instance(ThrowCoro)
208            # throw_coro_service._add_direct_request(self._waiting_coro_id, WaitingCancelled)
209            kill_coro_service: KillCoro = self._loop.get_service_instance(KillCoro)
210            kill_coro_service._add_direct_request(self._waiting_coro_id)
211            self._waiting_coro_id = None
212            try_send_async_event(self._loop, WAITING_FOR_NEW_REQUESTS_EVENT, None)
213
214        if self.internal_loop_in_yield:
215            if self.pending_requests_num:
216                self.register_response(self._internal_loop_holding_coro.coro_id, None, None)
217                self.internal_loop_in_yield = False
218
219        if self.internal_loop_start_waiters:
220            loop_response = None
221            exception_response = None
222            if self.internal_async_loop is not None:
223                loop_response = self.async_loop = self.internal_async_loop
224            elif self.internal_loop_creation_error is not None:
225                exception_response = self.internal_loop_creation_error
226            
227            if loop_response or exception_response:
228                for coro_id in self.internal_loop_start_waiters:
229                    self.register_response(coro_id, loop_response, exception_response)
230
231                self.internal_loop_start_waiters = type(self.internal_loop_start_waiters)()
232
233        for coro_id, response in self.results.items():
234            result, exception = response
235            if coro_id in self.intercommunication_requests_coro_ids:
236                self.intercommunication_requests_coro_ids.remove(coro_id)
237                self._responses.append(DirectResponse(coro_id, type(self), result, exception))
238            else:
239                self.register_response(coro_id, result, exception)
240        
241        self.pending_requests_num -= len(self.results)
242        self.results = type(self.results)()
243        
244        if not self.no_idle_calls:
245            self.make_dead()
246    
247    def is_low_latency(self) -> bool:
248        return True
249
250    def in_work(self) -> bool:
251        result: bool = bool(self.internal_loop_start_waiters) | bool(self.pending_requests_num) | bool(self.results)
252        return self.thrifty_in_work(result)
253    
254    def _on_inherit_surrounding_loop(self) -> Tuple[bool, Optional[AbstractEventLoop], Exception]:
255        exception = None
256        try:
257            self.async_loop = get_running_loop()
258        except:
259            exception = get_exception()
260
261        return True, self.async_loop, exception
262    
263    def _on_start_internal_loop(self, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None, debug: bool = False) -> Tuple[bool, Optional[AbstractEventLoop], Exception]:
264        if self.internal_async_loop:
265            self.async_loop = self.internal_async_loop
266            return True, self.async_loop, None
267        
268        coro_id = self.current_caller_coro_info.coro_id
269        self.internal_loop_start_waiters.add(coro_id)
270        if self._internal_loop_holding_coro is None:
271            self.internal_async_loop = None
272            self.internal_loop_creation_error = None
273            try:
274                if self._is_asyncio_loop_has_run_once_method():
275                    internal_loop_holding_coro_worker = ExplicitWorker(CoroType.awaitable, _internal_loop_holding_coro_run_once_based)
276                else:
277                    internal_loop_holding_coro_worker = ExplicitWorker(CoroType.greenlet, _internal_loop_holding_coro)
278            except ExternalAsyncioLoopAlreadyExistsError as ex:
279                return True, None, ex
280            
281            self._internal_loop_holding_coro = self._loop.put_coro(internal_loop_holding_coro_worker, self, main_awaitable, priority, interrupt_when_no_requests, debug)
282            if interrupt_when_no_requests:
283                self._internal_loop_holding_coro.is_background_coro = True
284
285        return False, None, None
286    
287    def _is_asyncio_loop_has_run_once_method(self) -> bool:
288        result = False
289        if events._get_running_loop() is not None:
290            raise ExternalAsyncioLoopAlreadyExistsError
291
292        loop = None
293        try:
294            loop = events.new_event_loop()
295            if hasattr(loop, '_run_once'):
296                result = True
297        finally:
298            if loop is not None:
299                loop.close()
300        
301        return result
302    
303    def _on_ensure_loop(self, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None) -> Tuple[bool, Optional[AbstractEventLoop], Exception]:
304        result_exists, result, exception = self._on_get()
305        if result and (exception is None):
306            return result_exists, result, exception
307
308        result_exists, result, exception = self._on_inherit_surrounding_loop()
309        if result and (exception is None):
310            return result_exists, result, exception
311        
312        return self._on_start_internal_loop(main_awaitable, priority, interrupt_when_no_requests)
313
314    def _on_set(self, async_loop):
315        self.async_loop = async_loop
316        return True, None, None
317
318    def _on_get(self):
319        if self.async_loop is None:
320            exception = AsyncioLoopWasNotSetError()
321        else:
322            exception = None
323        
324        return True, self.async_loop, exception
325    
326    def register_await_response(self, coro_id: CoroID, response: Any, exception: Optional[Exception]):
327        self.results[coro_id] = (response, exception)
328        self.no_idle_calls.discard(coro_id)
329        self.make_live()
330
331    def _on_await(self, awaitable: Awaitable, intercommunication_request: bool = False, prevent_idle: bool = True) -> Tuple[bool, Any, Optional[Exception]]:
332        if self.async_loop is None:
333            return True, None, AsyncioLoopWasNotSetError()
334        
335        coro_id = self.current_caller_coro_info.coro_id
336        if intercommunication_request:
337            self.intercommunication_requests_coro_ids.add(coro_id)
338        
339        async def awaiting_worker(asyncio_loop_instance: AsyncioLoop, coro_id: CoroID, awaitable: Awaitable):
340            exception = None
341            result = None
342            try:
343                result = await awaitable
344            except:
345                exception = get_exception()
346            
347            asyncio_loop_instance.register_await_response(coro_id, result, exception)
348        
349        create_task(self.async_loop, awaiting_worker, self, coro_id, awaitable)
350        
351        self.pending_requests_num += 1
352        if prevent_idle:
353            self.no_idle_calls.add(coro_id)
354        
355        self.make_live()
356        
357        return False, None, None
358
359    def _on_create_task(self, awaitable: Awaitable) -> Tuple[bool, Optional[asyncio_Task], Optional[Exception]]:
360        if self.async_loop is None:
361            return True, None, AsyncioLoopWasNotSetError()
362
363        async def awaiting_wrapper(awaitable: Awaitable):
364            return await awaitable
365
366        task: asyncio_Task = create_task(self.async_loop, awaiting_wrapper, awaitable)
367        return True, task, None
368    
369    def _on__internal_loop_yield(self) -> Tuple[bool, None, None]:
370        if self.pending_requests_num:
371            return True, None, None
372        else:
373            self.internal_loop_in_yield = True
374            return False, None, None
375    
376    def _on__internal_wait_for_new_requests(self) -> Tuple[bool, None, None]:
377        if self.new_requests_num:
378            return True, None, None
379        else:
380            self.waiting_for_new_requests = True
381            return False, None, None
382    
383    def register_new_asyncio_request(self) -> None:
384        self.new_requests_num += 1
385        self.make_live()
386    
387    def add_on_idle_handler(self) -> None:
388        if not self.use_higher_level_sleep_manager:
389            self.current_on_idle_handler = self._on_system_loop_idle
390            self._loop.on_idle_handlers.add(self._on_system_loop_idle)
391    
392    def discard_on_idle_handler(self) -> None:
393        self._loop.on_idle_handlers.discard(self._on_system_loop_idle)
394        self.current_on_idle_handler = None
395    
396    def _on__use_higher_level_sleep_manager(self, use_higher_level_sleep_manager: bool) -> Tuple[bool, Optional[None], None]:
397        self.use_higher_level_sleep_manager = use_higher_level_sleep_manager
398        if use_higher_level_sleep_manager:
399            if self.current_on_idle_handler is not None:
400                self._loop.on_idle_handlers.add(self.current_on_idle_handler)
401        else:
402            self.discard_on_idle_handler()
403        
404        return True, None, None
405
406    def _on__low_latency_io_mode(self, low_latency_io_mode: bool) -> Tuple[bool, Optional[bool], None]:
407        buff_low_latency_io_mode = self.low_latency_io_mode > 0
408        if low_latency_io_mode:
409            self.low_latency_io_mode += 1
410        else:
411            self.low_latency_io_mode -= 1
412        
413        return True, buff_low_latency_io_mode, None
414    
415    def _on_turn_on_loops_intercommunication(self, turn_on: bool) -> Tuple[bool, Optional[Callable], None]:
416        result = self._loop.on_wrong_request
417        if turn_on:
418            self._previous_on_wrong_request = self._loop.on_wrong_request
419            self.loops_intercommunication = True
420            self._loop.on_wrong_request = self._on_wrong_request
421        else:
422            if self.loops_intercommunication:
423                self._loop.on_wrong_request = self._previous_on_wrong_request
424                self.loops_intercommunication = False
425            
426            self._previous_on_wrong_request = None
427        
428        return True, result, None
429    
430    def _on_wrong_request(self, coro: CoroWrapperBase, request: Any) -> Request:
431        if request is None:
432            args, kwargs = args_kwargs(AsyncioLoopRequest()._wait_intercommunication(asyncio_coro_sleep_0()))
433            result: Request = Request(coro, type(self), *args, **kwargs)
434        else:
435            args, kwargs = args_kwargs(AsyncioLoopRequest()._wait_intercommunication(asyncio_coro_request(request)))
436            result = Request(coro, type(self), *args, **kwargs)
437
438        return result
439    
440    def inline_get(self):
441        if self.async_loop is None:
442            raise AsyncioLoopWasNotSetError
443        else:
444            return self.async_loop
445    
446    def inline_set_internal_loop(self, loop, exception: Optional[Exception]):
447        self.internal_async_loop = loop
448        self.internal_loop_creation_error = exception
449        self.make_live()
450    
451    def is_need_to_yield_internal_loop(self) -> bool:
452        return not self.pending_requests_num
AsyncioLoop( loop: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
141    def __init__(self, loop: CoroSchedulerType):
142        super(AsyncioLoop, self).__init__(loop)
143        self.async_loop: Optional[AbstractEventLoop] = None
144        self.internal_async_loop: Optional[AbstractEventLoop] = None
145        self._internal_loop_holding_coro: Optional[CoroWrapperBase] = None
146        self.internal_loop_start_waiters: Set[CoroID] = set()
147        self.need_to_stop_internal_loop: bool = False
148        self.internal_loop_creation_error: Optional[Exception] = None
149        self.internal_loop_in_yield: bool = False
150        self.waiting_for_new_requests: bool = False
151        self.loops_intercommunication: bool = False
152        self._previous_on_wrong_request = None
153        self.intercommunication_requests_coro_ids: Set[CoroID] = set()
154
155        self._request_workers = {
156            0: self._on_inherit_surrounding_loop,
157            1: self._on_start_internal_loop,
158            2: self._on_ensure_loop,
159            3: self._on_set,
160            4: self._on_get,
161            5: self._on_await,
162            6: self._on_create_task,
163            7: self._on__internal_loop_yield,
164            8: self._on_turn_on_loops_intercommunication,
165            9: self._on_await,
166            10: self._on_await,
167            11: self._on__internal_wait_for_new_requests, 
168            12: self._on__use_higher_level_sleep_manager, 
169            13: self._on__low_latency_io_mode, 
170        }
171        
172        self.pending_requests_num: int = 0
173        self.new_requests_num: int = 0
174        self.no_idle_calls: Set[CoroID] = set()
175        self.results: Dict[CoroID, Tuple[Any, Exception]] = dict()
176        self._waiting_coro_id: Optional[CoroID] = None
177        self._original_loop_class: Type = None
178        self._idle_for: Optional[RationalNumber] = None  # in seconds
179        self.use_higher_level_sleep_manager: bool = False
180        self.current_on_idle_handler: Optional[Callable] = None
181        self.low_latency_io_mode: int = 0
async_loop: Union[asyncio.events.AbstractEventLoop, NoneType]
internal_async_loop: Union[asyncio.events.AbstractEventLoop, NoneType]
internal_loop_start_waiters: Set[int]
need_to_stop_internal_loop: bool
internal_loop_creation_error: Union[Exception, NoneType]
internal_loop_in_yield: bool
waiting_for_new_requests: bool
loops_intercommunication: bool
intercommunication_requests_coro_ids: Set[int]
pending_requests_num: int
new_requests_num: int
no_idle_calls: Set[int]
results: Dict[int, Tuple[Any, Exception]]
use_higher_level_sleep_manager: bool
current_on_idle_handler: Union[Callable, NoneType]
low_latency_io_mode: int
def destroy(self):
183    def destroy(self):
184        if self.internal_async_loop is not None:
185            events._set_running_loop(None)
def get_entity_stats( self, stats_level: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin.StatsLevel = <StatsLevel.debug: 1>) -> Tuple[str, Dict[str, Any]]:
193    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
194        return type(self).__name__, {
195            'pending requests num': self.pending_requests_num
196        }
def single_task_registration_or_immediate_processing( self, request: Union[AsyncioLoopRequest, NoneType] = None) -> Tuple[bool, NoneType, NoneType]:
198    def single_task_registration_or_immediate_processing(
199            self, request: Optional[AsyncioLoopRequest]=None) -> Tuple[bool, None, None]:
200        if request is not None:
201            return self.resolve_request(request)
202        return True, None, None
def full_processing_iteration(self):
204    def full_processing_iteration(self):
205        from cengal.parallel_execution.coroutines.coro_standard_services.async_event_bus import AsyncEventBusRequest, try_send_async_event
206        if (self._waiting_coro_id is not None) and self.new_requests_num:
207            # throw_coro_service: ThrowCoro = self._loop.get_service_instance(ThrowCoro)
208            # throw_coro_service._add_direct_request(self._waiting_coro_id, WaitingCancelled)
209            kill_coro_service: KillCoro = self._loop.get_service_instance(KillCoro)
210            kill_coro_service._add_direct_request(self._waiting_coro_id)
211            self._waiting_coro_id = None
212            try_send_async_event(self._loop, WAITING_FOR_NEW_REQUESTS_EVENT, None)
213
214        if self.internal_loop_in_yield:
215            if self.pending_requests_num:
216                self.register_response(self._internal_loop_holding_coro.coro_id, None, None)
217                self.internal_loop_in_yield = False
218
219        if self.internal_loop_start_waiters:
220            loop_response = None
221            exception_response = None
222            if self.internal_async_loop is not None:
223                loop_response = self.async_loop = self.internal_async_loop
224            elif self.internal_loop_creation_error is not None:
225                exception_response = self.internal_loop_creation_error
226            
227            if loop_response or exception_response:
228                for coro_id in self.internal_loop_start_waiters:
229                    self.register_response(coro_id, loop_response, exception_response)
230
231                self.internal_loop_start_waiters = type(self.internal_loop_start_waiters)()
232
233        for coro_id, response in self.results.items():
234            result, exception = response
235            if coro_id in self.intercommunication_requests_coro_ids:
236                self.intercommunication_requests_coro_ids.remove(coro_id)
237                self._responses.append(DirectResponse(coro_id, type(self), result, exception))
238            else:
239                self.register_response(coro_id, result, exception)
240        
241        self.pending_requests_num -= len(self.results)
242        self.results = type(self.results)()
243        
244        if not self.no_idle_calls:
245            self.make_dead()
def is_low_latency(self) -> bool:
247    def is_low_latency(self) -> bool:
248        return True
def in_work(self) -> bool:
250    def in_work(self) -> bool:
251        result: bool = bool(self.internal_loop_start_waiters) | bool(self.pending_requests_num) | bool(self.results)
252        return self.thrifty_in_work(result)

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

def register_await_response( self, coro_id: int, response: Any, exception: Union[Exception, NoneType]):
326    def register_await_response(self, coro_id: CoroID, response: Any, exception: Optional[Exception]):
327        self.results[coro_id] = (response, exception)
328        self.no_idle_calls.discard(coro_id)
329        self.make_live()
def register_new_asyncio_request(self) -> None:
383    def register_new_asyncio_request(self) -> None:
384        self.new_requests_num += 1
385        self.make_live()
def add_on_idle_handler(self) -> None:
387    def add_on_idle_handler(self) -> None:
388        if not self.use_higher_level_sleep_manager:
389            self.current_on_idle_handler = self._on_system_loop_idle
390            self._loop.on_idle_handlers.add(self._on_system_loop_idle)
def discard_on_idle_handler(self) -> None:
392    def discard_on_idle_handler(self) -> None:
393        self._loop.on_idle_handlers.discard(self._on_system_loop_idle)
394        self.current_on_idle_handler = None
def inline_get(self):
440    def inline_get(self):
441        if self.async_loop is None:
442            raise AsyncioLoopWasNotSetError
443        else:
444            return self.async_loop
def inline_set_internal_loop(self, loop, exception: Union[Exception, NoneType]):
446    def inline_set_internal_loop(self, loop, exception: Optional[Exception]):
447        self.internal_async_loop = loop
448        self.internal_loop_creation_error = exception
449        self.make_live()
def is_need_to_yield_internal_loop(self) -> bool:
451    def is_need_to_yield_internal_loop(self) -> bool:
452        return not self.pending_requests_num
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
in_forground_work
thrifty_in_work
time_left_before_next_event
make_live
make_dead
service_id_impl
service_id
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
StatsLevel
class AsyncioLoopRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
 85class AsyncioLoopRequest(ServiceRequest):
 86    def inherit_surrounding_loop(self) -> AbstractEventLoop:
 87        return self._save(0)
 88
 89    def start_internal_loop(self, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None, debug: bool = False) -> AbstractEventLoop:
 90        return self._save(1, main_awaitable, priority, interrupt_when_no_requests, debug)
 91
 92    def ensure_loop(self, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None) -> AbstractEventLoop:
 93        return self._save(2, main_awaitable, priority, interrupt_when_no_requests)
 94
 95    def set(self, async_loop) -> None:
 96        return self._save(3, async_loop)
 97
 98    def get(self) -> AbstractEventLoop:
 99        return self._save(4)
100
101    def wait(self, awaitable: Awaitable) -> Any:
102        return self._save(5, awaitable, False, True)
103
104    def create_task(self, awaitable: Awaitable) -> asyncio_Task:
105        return self._save(6, awaitable)
106
107    def _internal_loop_yield(self) -> None:
108        return self._save(7)
109
110    def turn_on_loops_intercommunication(self, turn_on: bool = True) -> Optional[Callable]:
111        return self._save(8, turn_on)
112
113    def _wait_intercommunication(self, awaitable: Awaitable) -> Any:
114        return self._save(9, awaitable, True, True)
115
116    def wait_idle(self, awaitable: Awaitable) -> Any:
117        return self._save(10, awaitable, False, False)
118
119    def use_higher_level_sleep_manager(self, use_higher_level_sleep_manager: bool = True) -> Any:
120        return self._save(12, use_higher_level_sleep_manager)
121
122    def turn_on_low_latency_io_mode(self, low_latency_io_mode: bool = True) -> Any:
123        return self._save(13, low_latency_io_mode)
def inherit_surrounding_loop(self) -> asyncio.events.AbstractEventLoop:
86    def inherit_surrounding_loop(self) -> AbstractEventLoop:
87        return self._save(0)
def start_internal_loop( self, main_awaitable: Union[Awaitable, NoneType] = None, priority: Union[cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield.CoroPriority, NoneType] = None, interrupt_when_no_requests: Union[bool, NoneType] = None, debug: bool = False) -> asyncio.events.AbstractEventLoop:
89    def start_internal_loop(self, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None, debug: bool = False) -> AbstractEventLoop:
90        return self._save(1, main_awaitable, priority, interrupt_when_no_requests, debug)
def ensure_loop( self, main_awaitable: Union[Awaitable, NoneType] = None, priority: Union[cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield.CoroPriority, NoneType] = None, interrupt_when_no_requests: Union[bool, NoneType] = None) -> asyncio.events.AbstractEventLoop:
92    def ensure_loop(self, main_awaitable: Optional[Awaitable] = None, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None) -> AbstractEventLoop:
93        return self._save(2, main_awaitable, priority, interrupt_when_no_requests)
def set(self, async_loop) -> None:
95    def set(self, async_loop) -> None:
96        return self._save(3, async_loop)
def get(self) -> asyncio.events.AbstractEventLoop:
98    def get(self) -> AbstractEventLoop:
99        return self._save(4)
def wait(self, awaitable: Awaitable) -> Any:
101    def wait(self, awaitable: Awaitable) -> Any:
102        return self._save(5, awaitable, False, True)
def create_task(self, awaitable: Awaitable) -> _asyncio.Task:
104    def create_task(self, awaitable: Awaitable) -> asyncio_Task:
105        return self._save(6, awaitable)
def turn_on_loops_intercommunication(self, turn_on: bool = True) -> Union[Callable, NoneType]:
110    def turn_on_loops_intercommunication(self, turn_on: bool = True) -> Optional[Callable]:
111        return self._save(8, turn_on)
def wait_idle(self, awaitable: Awaitable) -> Any:
116    def wait_idle(self, awaitable: Awaitable) -> Any:
117        return self._save(10, awaitable, False, False)
def use_higher_level_sleep_manager(self, use_higher_level_sleep_manager: bool = True) -> Any:
119    def use_higher_level_sleep_manager(self, use_higher_level_sleep_manager: bool = True) -> Any:
120        return self._save(12, use_higher_level_sleep_manager)
def turn_on_low_latency_io_mode(self, low_latency_io_mode: bool = True) -> Any:
122    def turn_on_low_latency_io_mode(self, low_latency_io_mode: bool = True) -> Any:
123        return self._save(13, low_latency_io_mode)
default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] = <class 'AsyncioLoop'>
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
default__request__type__
request_type
args
kwargs
provide_to_request_handler
interface
i
async_interface
ai
class AsyncioLoopWasNotSetError(builtins.Exception):
70class AsyncioLoopWasNotSetError(Exception):
71    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
def run_in_thread_pool(function: Callable, *args, **kwargs) -> Any:
658def run_in_thread_pool(function: Callable, *args, **kwargs) -> Any:
659    return run_in_thread_pool_fast(current_interface(), function, *args, **kwargs)
def run_in_thread_pool_fast( interface: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, function: Callable, *args, **kwargs) -> Any:
649def run_in_thread_pool_fast(interface: Interface, function: Callable, *args, **kwargs) -> Any:
650    async def task_wrapper(loop, *args, **kwargs):
651        return await task_in_thread_pool(loop, function, *args, **kwargs)
652
653    return interface(AsyncioLoop, AsyncioLoopRequest().wait(task_wrapper(
654        interface._loop.get_service_instance(AsyncioLoop).inline_get(), 
655        *args, **kwargs)))