cengal.parallel_execution.coroutines.coro_standard_services.asyncio_loop.versions.v_0.known_asyncio_compatible_loops

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19"""
 20Module Docstring
 21Docstrings: http://www.python.org/dev/peps/pep-0257/
 22"""
 23
 24
 25__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 28__license__ = "Apache License, Version 2.0"
 29__version__ = "4.4.1"
 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 31__email__ = "gtalk@butenkoms.space"
 32# __status__ = "Prototype"
 33__status__ = "Development"
 34# __status__ = "Production"
 35
 36
 37__all__ = ['prepare_loop', 'restore_loop', 'DerivedFromProactorEventLoop', 'DerivedFromSelectorEventLoop', 'DerivedFromUVLoop']
 38
 39
 40from cengal.parallel_execution.coroutines.coro_scheduler import CoroSchedulerType, Interface, Coro, AnyWorker, current_interface, current_coro_scheduler, cs_coro, cs_acoro
 41from cengal.data_manipulation.conversion.reinterpret_cast import reinterpret_cast
 42from cengal.data_manipulation.conversion.reinterpret_cast_management.manager import BaseAutoDerivedObjWrapper
 43from asyncio import SelectorEventLoop, AbstractEventLoop
 44_proactor_present = False
 45try:
 46    from asyncio import ProactorEventLoop
 47    _proactor_present = True
 48except ImportError:
 49    pass
 50
 51from uvloop import Loop as UVLoop
 52from typing import Type, Tuple, Dict, Callable, Any
 53
 54
 55def call_soon(self, callback, *args, **kwargs):
 56    from .asyncio_loop import AsyncioLoop
 57    cs: CoroSchedulerType = self._cs
 58    service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
 59    service.register_new_asyncio_request()
 60    return SelectorEventLoop.call_soon(self, callback, *args, **kwargs)
 61
 62def call_later(self, delay, callback, *args, **kwargs):
 63    from .asyncio_loop import AsyncioLoop
 64    cs: CoroSchedulerType = self._cs
 65    service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
 66    service.register_new_asyncio_request()
 67    return SelectorEventLoop.call_later(self, delay, callback, *args, **kwargs)
 68
 69def call_at(self, when, callback, *args, **kwargs):
 70    from .asyncio_loop import AsyncioLoop
 71    cs: CoroSchedulerType = self._cs
 72    service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
 73    service.register_new_asyncio_request()
 74    return SelectorEventLoop.call_at(self, when, callback, *args, **kwargs)
 75
 76# Method scheduling a coroutine object: create a task.
 77
 78def create_task(self, coro, **kwargs):
 79    from .asyncio_loop import AsyncioLoop
 80    cs: CoroSchedulerType = self._cs
 81    service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
 82    service.register_new_asyncio_request()
 83    return SelectorEventLoop.create_task(self, coro, **kwargs)
 84
 85# Methods for interacting with threads.
 86
 87def call_soon_threadsafe(self, callback, *args, **kwargs):
 88    from .asyncio_loop import AsyncioLoop
 89    cs: CoroSchedulerType = self._cs
 90    service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
 91    service.register_new_asyncio_request()
 92    return SelectorEventLoop.call_soon_threadsafe(self, callback, *args, **kwargs)
 93
 94def run_in_executor(self, executor, func, *args):
 95    from .asyncio_loop import AsyncioLoop
 96    cs: CoroSchedulerType = self._cs
 97    service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
 98    service.register_new_asyncio_request()
 99    return SelectorEventLoop.run_in_executor(self, executor, func, *args)
100
101def add_reader(self, fd, callback, *args):
102    from .asyncio_loop import AsyncioLoop
103    cs: CoroSchedulerType = self._cs
104    service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
105    service.register_new_asyncio_request()
106    return SelectorEventLoop.add_reader(self, fd, callback, *args)
107
108def add_writer(self, fd, callback, *args):
109    from .asyncio_loop import AsyncioLoop
110    cs: CoroSchedulerType = self._cs
111    service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
112    service.register_new_asyncio_request()
113    return SelectorEventLoop.add_writer(self, fd, callback, *args)
114
115# Signal handling.
116
117def add_signal_handler(self, sig, callback, *args):
118    from .asyncio_loop import AsyncioLoop
119    cs: CoroSchedulerType = self._cs
120    service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
121    service.register_new_asyncio_request()
122    return SelectorEventLoop.add_signal_handler(self, sig, callback, *args)
123
124
125class LoopWrapper(BaseAutoDerivedObjWrapper):
126    def wrapping_required(self, obj: Any, base_type: Type, fields: Tuple[str], planned_type_name: str) -> bool:
127        return issubclass(base_type, AbstractEventLoop)
128    
129    def methods(self, obj: Any, base_type: Type, fields: Tuple[str]) -> Dict[str, Callable]:
130        return {
131            'call_soon': call_soon,
132            'call_later': call_later,
133            'call_at': call_at,
134            'create_task': create_task,
135            'call_soon_threadsafe': call_soon_threadsafe,
136            'run_in_executor': run_in_executor,
137            'add_reader': add_reader,
138            'add_writer': add_writer,
139            'add_signal_handler': add_signal_handler,
140        }
141
142
143_lw: LoopWrapper = LoopWrapper()
144
145
146def prepare_loop(loop: AbstractEventLoop) -> Type:
147    if _proactor_present and type(loop) is ProactorEventLoop:
148        reinterpret_cast(DerivedFromProactorEventLoop, loop)
149        loop._cs = current_coro_scheduler()
150        return ProactorEventLoop
151    elif type(loop) is SelectorEventLoop:
152        reinterpret_cast(DerivedFromSelectorEventLoop, loop)
153        loop._cs = current_coro_scheduler()
154        return SelectorEventLoop
155    elif type(loop) is UVLoop:
156        reinterpret_cast(DerivedFromUVLoop, loop)
157        loop._cs = current_coro_scheduler()
158        return UVLoop
159    elif isinstance(loop, AbstractEventLoop):
160        original_type: Type = type(loop)
161        _lw(loop)
162        loop._cs = current_coro_scheduler()
163        return original_type
164    else:
165        raise TypeError('Unknown loop type: {}'.format(type(loop)))
166
167
168def restore_loop(loop: AbstractEventLoop, original_type: Type) -> None:
169    reinterpret_cast(original_type, loop)
170
171
172if _proactor_present:
173    class DerivedFromProactorEventLoop(ProactorEventLoop):
174        def call_soon(self, callback, *args, **kwargs):
175            from .asyncio_loop import AsyncioLoop
176            cs: CoroSchedulerType = self._cs
177            service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
178            service.register_new_asyncio_request()
179            return ProactorEventLoop.call_soon(self, callback, *args, **kwargs)
180
181        def call_later(self, delay, callback, *args, **kwargs):
182            from .asyncio_loop import AsyncioLoop
183            cs: CoroSchedulerType = self._cs
184            service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
185            service.register_new_asyncio_request()
186            return ProactorEventLoop.call_later(self, delay, callback, *args, **kwargs)
187
188        def call_at(self, when, callback, *args, **kwargs):
189            from .asyncio_loop import AsyncioLoop
190            cs: CoroSchedulerType = self._cs
191            service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
192            service.register_new_asyncio_request()
193            return ProactorEventLoop.call_at(self, when, callback, *args, **kwargs)
194
195        # Method scheduling a coroutine object: create a task.
196
197        def create_task(self, coro, **kwargs):
198            from .asyncio_loop import AsyncioLoop
199            cs: CoroSchedulerType = self._cs
200            service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
201            service.register_new_asyncio_request()
202            return ProactorEventLoop.create_task(self, coro, **kwargs)
203
204        # Methods for interacting with threads.
205
206        def call_soon_threadsafe(self, callback, *args, **kwargs):
207            from .asyncio_loop import AsyncioLoop
208            cs: CoroSchedulerType = self._cs
209            service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
210            service.register_new_asyncio_request()
211            return ProactorEventLoop.call_soon_threadsafe(self, callback, *args, **kwargs)
212
213        def run_in_executor(self, executor, func, *args):
214            from .asyncio_loop import AsyncioLoop
215            cs: CoroSchedulerType = self._cs
216            service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
217            service.register_new_asyncio_request()
218            return ProactorEventLoop.run_in_executor(self, executor, func, *args)
219
220        def add_reader(self, fd, callback, *args):
221            from .asyncio_loop import AsyncioLoop
222            cs: CoroSchedulerType = self._cs
223            service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
224            service.register_new_asyncio_request()
225            return ProactorEventLoop.add_reader(self, fd, callback, *args)
226
227        def add_writer(self, fd, callback, *args):
228            from .asyncio_loop import AsyncioLoop
229            cs: CoroSchedulerType = self._cs
230            service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
231            service.register_new_asyncio_request()
232            return ProactorEventLoop.add_writer(self, fd, callback, *args)
233
234        # Signal handling.
235
236        def add_signal_handler(self, sig, callback, *args):
237            from .asyncio_loop import AsyncioLoop
238            cs: CoroSchedulerType = self._cs
239            service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
240            service.register_new_asyncio_request()
241            return ProactorEventLoop.add_signal_handler(self, sig, callback, *args)
242
243
244class DerivedFromSelectorEventLoop(SelectorEventLoop):
245    def call_soon(self, callback, *args, **kwargs):
246        from .asyncio_loop import AsyncioLoop
247        cs: CoroSchedulerType = self._cs
248        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
249        service.register_new_asyncio_request()
250        return SelectorEventLoop.call_soon(self, callback, *args, **kwargs)
251
252    def call_later(self, delay, callback, *args, **kwargs):
253        from .asyncio_loop import AsyncioLoop
254        cs: CoroSchedulerType = self._cs
255        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
256        service.register_new_asyncio_request()
257        return SelectorEventLoop.call_later(self, delay, callback, *args, **kwargs)
258
259    def call_at(self, when, callback, *args, **kwargs):
260        from .asyncio_loop import AsyncioLoop
261        cs: CoroSchedulerType = self._cs
262        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
263        service.register_new_asyncio_request()
264        return SelectorEventLoop.call_at(self, when, callback, *args, **kwargs)
265
266    # Method scheduling a coroutine object: create a task.
267
268    def create_task(self, coro, **kwargs):
269        from .asyncio_loop import AsyncioLoop
270        cs: CoroSchedulerType = self._cs
271        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
272        service.register_new_asyncio_request()
273        return SelectorEventLoop.create_task(self, coro, **kwargs)
274
275    # Methods for interacting with threads.
276
277    def call_soon_threadsafe(self, callback, *args, **kwargs):
278        from .asyncio_loop import AsyncioLoop
279        cs: CoroSchedulerType = self._cs
280        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
281        service.register_new_asyncio_request()
282        return SelectorEventLoop.call_soon_threadsafe(self, callback, *args, **kwargs)
283
284    def run_in_executor(self, executor, func, *args):
285        from .asyncio_loop import AsyncioLoop
286        cs: CoroSchedulerType = self._cs
287        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
288        service.register_new_asyncio_request()
289        return SelectorEventLoop.run_in_executor(self, executor, func, *args)
290
291    def add_reader(self, fd, callback, *args):
292        from .asyncio_loop import AsyncioLoop
293        cs: CoroSchedulerType = self._cs
294        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
295        service.register_new_asyncio_request()
296        return SelectorEventLoop.add_reader(self, fd, callback, *args)
297
298    def add_writer(self, fd, callback, *args):
299        from .asyncio_loop import AsyncioLoop
300        cs: CoroSchedulerType = self._cs
301        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
302        service.register_new_asyncio_request()
303        return SelectorEventLoop.add_writer(self, fd, callback, *args)
304
305    # Signal handling.
306
307    def add_signal_handler(self, sig, callback, *args):
308        from .asyncio_loop import AsyncioLoop
309        cs: CoroSchedulerType = self._cs
310        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
311        service.register_new_asyncio_request()
312        return SelectorEventLoop.add_signal_handler(self, sig, callback, *args)
313
314
315class DerivedFromUVLoop(UVLoop):
316    def call_soon(self, callback, *args, context=None):
317        from .asyncio_loop import AsyncioLoop
318        cs: CoroSchedulerType = self._cs
319        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
320        service.register_new_asyncio_request()
321        return UVLoop.call_soon(self, callback, *args, context=context)
322
323    def call_later(self, delay, callback, *args, context=None):
324        from .asyncio_loop import AsyncioLoop
325        cs: CoroSchedulerType = self._cs
326        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
327        service.register_new_asyncio_request()
328        return UVLoop.call_later(self, delay, callback, *args, context=context)
329
330    def call_at(self, when, callback, *args, context=None):
331        from .asyncio_loop import AsyncioLoop
332        cs: CoroSchedulerType = self._cs
333        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
334        service.register_new_asyncio_request()
335        return UVLoop.call_at(self, when, callback, *args, context=context)
336
337    # Method scheduling a coroutine object: create a task.
338
339    def create_task(self, coro, *, name=None, context=None):
340        from .asyncio_loop import AsyncioLoop
341        cs: CoroSchedulerType = self._cs
342        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
343        service.register_new_asyncio_request()
344        return UVLoop.create_task(self, coro, name=name, context=context)
345
346    # Methods for interacting with threads.
347
348    def call_soon_threadsafe(self, callback, *args, context=None):
349        from .asyncio_loop import AsyncioLoop
350        cs: CoroSchedulerType = self._cs
351        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
352        service.register_new_asyncio_request()
353        return UVLoop.call_soon_threadsafe(self, callback, *args, context=context)
354
355    def run_in_executor(self, executor, func, *args):
356        from .asyncio_loop import AsyncioLoop
357        cs: CoroSchedulerType = self._cs
358        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
359        service.register_new_asyncio_request()
360        return UVLoop.run_in_executor(self, executor, func, *args)
361
362    def add_reader(self, fd, callback, *args):
363        from .asyncio_loop import AsyncioLoop
364        cs: CoroSchedulerType = self._cs
365        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
366        service.register_new_asyncio_request()
367        return UVLoop.add_reader(self, fd, callback, *args)
368
369    def add_writer(self, fd, callback, *args):
370        from .asyncio_loop import AsyncioLoop
371        cs: CoroSchedulerType = self._cs
372        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
373        service.register_new_asyncio_request()
374        return UVLoop.add_writer(self, fd, callback, *args)
375
376    # Signal handling.
377
378    def add_signal_handler(self, sig, callback, *args):
379        from .asyncio_loop import AsyncioLoop
380        cs: CoroSchedulerType = self._cs
381        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
382        service.register_new_asyncio_request()
383        return UVLoop.add_signal_handler(self, sig, callback, *args)
def prepare_loop(loop: asyncio.events.AbstractEventLoop) -> Type:
147def prepare_loop(loop: AbstractEventLoop) -> Type:
148    if _proactor_present and type(loop) is ProactorEventLoop:
149        reinterpret_cast(DerivedFromProactorEventLoop, loop)
150        loop._cs = current_coro_scheduler()
151        return ProactorEventLoop
152    elif type(loop) is SelectorEventLoop:
153        reinterpret_cast(DerivedFromSelectorEventLoop, loop)
154        loop._cs = current_coro_scheduler()
155        return SelectorEventLoop
156    elif type(loop) is UVLoop:
157        reinterpret_cast(DerivedFromUVLoop, loop)
158        loop._cs = current_coro_scheduler()
159        return UVLoop
160    elif isinstance(loop, AbstractEventLoop):
161        original_type: Type = type(loop)
162        _lw(loop)
163        loop._cs = current_coro_scheduler()
164        return original_type
165    else:
166        raise TypeError('Unknown loop type: {}'.format(type(loop)))
def restore_loop( loop: asyncio.events.AbstractEventLoop, original_type: typing.Type) -> None:
169def restore_loop(loop: AbstractEventLoop, original_type: Type) -> None:
170    reinterpret_cast(original_type, loop)
DerivedFromProactorEventLoop
class DerivedFromSelectorEventLoop(asyncio.unix_events._UnixSelectorEventLoop):
245class DerivedFromSelectorEventLoop(SelectorEventLoop):
246    def call_soon(self, callback, *args, **kwargs):
247        from .asyncio_loop import AsyncioLoop
248        cs: CoroSchedulerType = self._cs
249        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
250        service.register_new_asyncio_request()
251        return SelectorEventLoop.call_soon(self, callback, *args, **kwargs)
252
253    def call_later(self, delay, callback, *args, **kwargs):
254        from .asyncio_loop import AsyncioLoop
255        cs: CoroSchedulerType = self._cs
256        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
257        service.register_new_asyncio_request()
258        return SelectorEventLoop.call_later(self, delay, callback, *args, **kwargs)
259
260    def call_at(self, when, callback, *args, **kwargs):
261        from .asyncio_loop import AsyncioLoop
262        cs: CoroSchedulerType = self._cs
263        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
264        service.register_new_asyncio_request()
265        return SelectorEventLoop.call_at(self, when, callback, *args, **kwargs)
266
267    # Method scheduling a coroutine object: create a task.
268
269    def create_task(self, coro, **kwargs):
270        from .asyncio_loop import AsyncioLoop
271        cs: CoroSchedulerType = self._cs
272        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
273        service.register_new_asyncio_request()
274        return SelectorEventLoop.create_task(self, coro, **kwargs)
275
276    # Methods for interacting with threads.
277
278    def call_soon_threadsafe(self, callback, *args, **kwargs):
279        from .asyncio_loop import AsyncioLoop
280        cs: CoroSchedulerType = self._cs
281        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
282        service.register_new_asyncio_request()
283        return SelectorEventLoop.call_soon_threadsafe(self, callback, *args, **kwargs)
284
285    def run_in_executor(self, executor, func, *args):
286        from .asyncio_loop import AsyncioLoop
287        cs: CoroSchedulerType = self._cs
288        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
289        service.register_new_asyncio_request()
290        return SelectorEventLoop.run_in_executor(self, executor, func, *args)
291
292    def add_reader(self, fd, callback, *args):
293        from .asyncio_loop import AsyncioLoop
294        cs: CoroSchedulerType = self._cs
295        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
296        service.register_new_asyncio_request()
297        return SelectorEventLoop.add_reader(self, fd, callback, *args)
298
299    def add_writer(self, fd, callback, *args):
300        from .asyncio_loop import AsyncioLoop
301        cs: CoroSchedulerType = self._cs
302        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
303        service.register_new_asyncio_request()
304        return SelectorEventLoop.add_writer(self, fd, callback, *args)
305
306    # Signal handling.
307
308    def add_signal_handler(self, sig, callback, *args):
309        from .asyncio_loop import AsyncioLoop
310        cs: CoroSchedulerType = self._cs
311        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
312        service.register_new_asyncio_request()
313        return SelectorEventLoop.add_signal_handler(self, sig, callback, *args)

Unix event loop.

Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.

def call_soon(self, callback, *args, **kwargs):
246    def call_soon(self, callback, *args, **kwargs):
247        from .asyncio_loop import AsyncioLoop
248        cs: CoroSchedulerType = self._cs
249        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
250        service.register_new_asyncio_request()
251        return SelectorEventLoop.call_soon(self, callback, *args, **kwargs)

Arrange for a callback to be called as soon as possible.

This operates as a FIFO queue: callbacks are called in the order in which they are registered. Each callback will be called exactly once.

Any positional arguments after the callback will be passed to the callback when it is called.

def call_later(self, delay, callback, *args, **kwargs):
253    def call_later(self, delay, callback, *args, **kwargs):
254        from .asyncio_loop import AsyncioLoop
255        cs: CoroSchedulerType = self._cs
256        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
257        service.register_new_asyncio_request()
258        return SelectorEventLoop.call_later(self, delay, callback, *args, **kwargs)

Arrange for a callback to be called at a given time.

Return a Handle: an opaque object with a cancel() method that can be used to cancel the call.

The delay can be an int or float, expressed in seconds. It is always relative to the current time.

Each callback will be called exactly once. If two callbacks are scheduled for exactly the same time, it undefined which will be called first.

Any positional arguments after the callback will be passed to the callback when it is called.

def call_at(self, when, callback, *args, **kwargs):
260    def call_at(self, when, callback, *args, **kwargs):
261        from .asyncio_loop import AsyncioLoop
262        cs: CoroSchedulerType = self._cs
263        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
264        service.register_new_asyncio_request()
265        return SelectorEventLoop.call_at(self, when, callback, *args, **kwargs)

Like call_later(), but uses an absolute time.

Absolute time corresponds to the event loop's time() method.

def create_task(self, coro, **kwargs):
269    def create_task(self, coro, **kwargs):
270        from .asyncio_loop import AsyncioLoop
271        cs: CoroSchedulerType = self._cs
272        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
273        service.register_new_asyncio_request()
274        return SelectorEventLoop.create_task(self, coro, **kwargs)

Schedule a coroutine object.

Return a task object.

def call_soon_threadsafe(self, callback, *args, **kwargs):
278    def call_soon_threadsafe(self, callback, *args, **kwargs):
279        from .asyncio_loop import AsyncioLoop
280        cs: CoroSchedulerType = self._cs
281        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
282        service.register_new_asyncio_request()
283        return SelectorEventLoop.call_soon_threadsafe(self, callback, *args, **kwargs)

Like call_soon(), but thread-safe.

def run_in_executor(self, executor, func, *args):
285    def run_in_executor(self, executor, func, *args):
286        from .asyncio_loop import AsyncioLoop
287        cs: CoroSchedulerType = self._cs
288        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
289        service.register_new_asyncio_request()
290        return SelectorEventLoop.run_in_executor(self, executor, func, *args)
def add_reader(self, fd, callback, *args):
292    def add_reader(self, fd, callback, *args):
293        from .asyncio_loop import AsyncioLoop
294        cs: CoroSchedulerType = self._cs
295        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
296        service.register_new_asyncio_request()
297        return SelectorEventLoop.add_reader(self, fd, callback, *args)

Add a reader callback.

def add_writer(self, fd, callback, *args):
299    def add_writer(self, fd, callback, *args):
300        from .asyncio_loop import AsyncioLoop
301        cs: CoroSchedulerType = self._cs
302        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
303        service.register_new_asyncio_request()
304        return SelectorEventLoop.add_writer(self, fd, callback, *args)

Add a writer callback..

def add_signal_handler(self, sig, callback, *args):
308    def add_signal_handler(self, sig, callback, *args):
309        from .asyncio_loop import AsyncioLoop
310        cs: CoroSchedulerType = self._cs
311        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
312        service.register_new_asyncio_request()
313        return SelectorEventLoop.add_signal_handler(self, sig, callback, *args)

Add a handler for a signal. UNIX only.

Raise ValueError if the signal number is invalid or uncatchable. Raise RuntimeError if there is a problem setting up the handler.

Inherited Members
asyncio.unix_events._UnixSelectorEventLoop
_UnixSelectorEventLoop
close
remove_signal_handler
create_unix_connection
create_unix_server
asyncio.selector_events.BaseSelectorEventLoop
remove_reader
remove_writer
sock_recv
sock_recv_into
sock_sendall
sock_connect
sock_accept
asyncio.base_events.BaseEventLoop
slow_callback_duration
create_future
set_task_factory
get_task_factory
shutdown_asyncgens
run_forever
run_until_complete
stop
is_closed
is_running
time
set_default_executor
getaddrinfo
getnameinfo
sock_sendfile
create_connection
sendfile
start_tls
create_datagram_endpoint
create_server
connect_accepted_socket
connect_read_pipe
connect_write_pipe
subprocess_shell
subprocess_exec
get_exception_handler
set_exception_handler
default_exception_handler
call_exception_handler
get_debug
set_debug
class DerivedFromUVLoop(uvloop.Loop):
316class DerivedFromUVLoop(UVLoop):
317    def call_soon(self, callback, *args, context=None):
318        from .asyncio_loop import AsyncioLoop
319        cs: CoroSchedulerType = self._cs
320        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
321        service.register_new_asyncio_request()
322        return UVLoop.call_soon(self, callback, *args, context=context)
323
324    def call_later(self, delay, callback, *args, context=None):
325        from .asyncio_loop import AsyncioLoop
326        cs: CoroSchedulerType = self._cs
327        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
328        service.register_new_asyncio_request()
329        return UVLoop.call_later(self, delay, callback, *args, context=context)
330
331    def call_at(self, when, callback, *args, context=None):
332        from .asyncio_loop import AsyncioLoop
333        cs: CoroSchedulerType = self._cs
334        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
335        service.register_new_asyncio_request()
336        return UVLoop.call_at(self, when, callback, *args, context=context)
337
338    # Method scheduling a coroutine object: create a task.
339
340    def create_task(self, coro, *, name=None, context=None):
341        from .asyncio_loop import AsyncioLoop
342        cs: CoroSchedulerType = self._cs
343        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
344        service.register_new_asyncio_request()
345        return UVLoop.create_task(self, coro, name=name, context=context)
346
347    # Methods for interacting with threads.
348
349    def call_soon_threadsafe(self, callback, *args, context=None):
350        from .asyncio_loop import AsyncioLoop
351        cs: CoroSchedulerType = self._cs
352        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
353        service.register_new_asyncio_request()
354        return UVLoop.call_soon_threadsafe(self, callback, *args, context=context)
355
356    def run_in_executor(self, executor, func, *args):
357        from .asyncio_loop import AsyncioLoop
358        cs: CoroSchedulerType = self._cs
359        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
360        service.register_new_asyncio_request()
361        return UVLoop.run_in_executor(self, executor, func, *args)
362
363    def add_reader(self, fd, callback, *args):
364        from .asyncio_loop import AsyncioLoop
365        cs: CoroSchedulerType = self._cs
366        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
367        service.register_new_asyncio_request()
368        return UVLoop.add_reader(self, fd, callback, *args)
369
370    def add_writer(self, fd, callback, *args):
371        from .asyncio_loop import AsyncioLoop
372        cs: CoroSchedulerType = self._cs
373        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
374        service.register_new_asyncio_request()
375        return UVLoop.add_writer(self, fd, callback, *args)
376
377    # Signal handling.
378
379    def add_signal_handler(self, sig, callback, *args):
380        from .asyncio_loop import AsyncioLoop
381        cs: CoroSchedulerType = self._cs
382        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
383        service.register_new_asyncio_request()
384        return UVLoop.add_signal_handler(self, sig, callback, *args)

Loop()

def call_soon(self, callback, *args, context=None):
317    def call_soon(self, callback, *args, context=None):
318        from .asyncio_loop import AsyncioLoop
319        cs: CoroSchedulerType = self._cs
320        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
321        service.register_new_asyncio_request()
322        return UVLoop.call_soon(self, callback, *args, context=context)

Loop.call_soon(self, callback, *args, context=None) Arrange for a callback to be called as soon as possible.

    This operates as a FIFO queue: callbacks are called in the
    order in which they are registered.  Each callback will be
    called exactly once.

    Any positional arguments after the callback will be passed to
    the callback when it is called.
def call_later(self, delay, callback, *args, context=None):
324    def call_later(self, delay, callback, *args, context=None):
325        from .asyncio_loop import AsyncioLoop
326        cs: CoroSchedulerType = self._cs
327        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
328        service.register_new_asyncio_request()
329        return UVLoop.call_later(self, delay, callback, *args, context=context)

Loop.call_later(self, delay, callback, *args, context=None) Arrange for a callback to be called at a given time.

    Return a Handle: an opaque object with a cancel() method that
    can be used to cancel the call.

    The delay can be an int or float, expressed in seconds.  It is
    always relative to the current time.

    Each callback will be called exactly once.  If two callbacks
    are scheduled for exactly the same time, it undefined which
    will be called first.

    Any positional arguments after the callback will be passed to
    the callback when it is called.
def call_at(self, when, callback, *args, context=None):
331    def call_at(self, when, callback, *args, context=None):
332        from .asyncio_loop import AsyncioLoop
333        cs: CoroSchedulerType = self._cs
334        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
335        service.register_new_asyncio_request()
336        return UVLoop.call_at(self, when, callback, *args, context=context)

Loop.call_at(self, when, callback, *args, context=None) Like call_later(), but uses an absolute time.

    Absolute time corresponds to the event loop's time() method.
def create_task(self, coro, *, name=None, context=None):
340    def create_task(self, coro, *, name=None, context=None):
341        from .asyncio_loop import AsyncioLoop
342        cs: CoroSchedulerType = self._cs
343        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
344        service.register_new_asyncio_request()
345        return UVLoop.create_task(self, coro, name=name, context=context)

Loop.create_task(self, coro, *, name=None, context=None) Schedule a coroutine object.

    Return a task object.

    If name is not None, task.set_name(name) will be called if the task
    object has the set_name attribute, true for default Task in Python 3.8.

    An optional keyword-only context argument allows specifying a custom
    contextvars.Context for the coro to run in. The current context copy is
    created when no context is provided.
def call_soon_threadsafe(self, callback, *args, context=None):
349    def call_soon_threadsafe(self, callback, *args, context=None):
350        from .asyncio_loop import AsyncioLoop
351        cs: CoroSchedulerType = self._cs
352        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
353        service.register_new_asyncio_request()
354        return UVLoop.call_soon_threadsafe(self, callback, *args, context=context)

Loop.call_soon_threadsafe(self, callback, *args, context=None) Like call_soon(), but thread-safe.

def run_in_executor(self, executor, func, *args):
356    def run_in_executor(self, executor, func, *args):
357        from .asyncio_loop import AsyncioLoop
358        cs: CoroSchedulerType = self._cs
359        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
360        service.register_new_asyncio_request()
361        return UVLoop.run_in_executor(self, executor, func, *args)

Loop.run_in_executor(self, executor, func, *args)

def add_reader(self, fd, callback, *args):
363    def add_reader(self, fd, callback, *args):
364        from .asyncio_loop import AsyncioLoop
365        cs: CoroSchedulerType = self._cs
366        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
367        service.register_new_asyncio_request()
368        return UVLoop.add_reader(self, fd, callback, *args)

Loop.add_reader(self, fileobj, callback, *args) Add a reader callback.

def add_writer(self, fd, callback, *args):
370    def add_writer(self, fd, callback, *args):
371        from .asyncio_loop import AsyncioLoop
372        cs: CoroSchedulerType = self._cs
373        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
374        service.register_new_asyncio_request()
375        return UVLoop.add_writer(self, fd, callback, *args)

Loop.add_writer(self, fileobj, callback, *args) Add a writer callback..

def add_signal_handler(self, sig, callback, *args):
379    def add_signal_handler(self, sig, callback, *args):
380        from .asyncio_loop import AsyncioLoop
381        cs: CoroSchedulerType = self._cs
382        service: AsyncioLoop = cs.get_service_instance(AsyncioLoop)
383        service.register_new_asyncio_request()
384        return UVLoop.add_signal_handler(self, sig, callback, *args)

Loop.add_signal_handler(self, sig, callback, *args) Add a handler for a signal. UNIX only.

    Raise ValueError if the signal number is invalid or uncatchable.
    Raise RuntimeError if there is a problem setting up the handler.
Inherited Members
uvloop.loop.Loop
Loop
run_forever
run_until_complete
stop
is_running
is_closed
close
shutdown_asyncgens
time
create_future
set_default_executor
getaddrinfo
getnameinfo
create_connection
create_server
start_tls
create_unix_connection
create_unix_server
create_datagram_endpoint
connect_read_pipe
connect_write_pipe
subprocess_shell
subprocess_exec
remove_reader
remove_writer
sock_recv
sock_recv_into
sock_sendall
sock_connect
sock_accept
remove_signal_handler
set_task_factory
get_task_factory
get_exception_handler
set_exception_handler
default_exception_handler
call_exception_handler
get_debug
set_debug
sock_recvfrom
sock_recvfrom_into
sock_sendto
connect_accepted_socket
shutdown_default_executor
print_debug_info
slow_callback_duration
asyncio.events.AbstractEventLoop
sendfile
sock_sendfile