cengal.parallel_execution.asyncio.run_loop.versions.v_0.run_loop

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19__all__ = ['run_forever', 'cancel_all_tasks']
 20
 21
 22"""
 23Module Docstring
 24Docstrings: http://www.python.org/dev/peps/pep-0257/
 25"""
 26
 27__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 28__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 29__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 30__license__ = "Apache License, Version 2.0"
 31__version__ = "4.4.1"
 32__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 33__email__ = "gtalk@butenkoms.space"
 34# __status__ = "Prototype"
 35__status__ = "Development"
 36# __status__ = "Production"
 37from asyncio import coroutines
 38from asyncio import events
 39from asyncio import tasks
 40
 41
 42def run_forever(main, *, debug=False):
 43    
 44    """Execute the coroutine and all started tasks and return the result.
 45
 46    This function runs the passed coroutine, taking care of
 47    managing the asyncio event loop and finalizing asynchronous
 48    generators.
 49
 50    This function cannot be called when another asyncio event loop is
 51    running in the same thread.
 52
 53    If debug is True, the event loop will be run in debug mode.
 54
 55    This function always creates a new event loop and closes it at the end.
 56    It should be used as a main entry point for asyncio programs, and should
 57    ideally only be called once.
 58
 59    Example:
 60        async def my_task():
 61            await asyncio.sleep(3)
 62            print('will be finished before loop ends unlike with the asyncio.run() call')
 63
 64
 65        async def main():
 66            asyncio.create_task(my_task())
 67            await asyncio.sleep(1)
 68            print('hello')
 69
 70
 71        run_forever(main())
 72    """
 73    if events._get_running_loop() is not None:
 74        raise RuntimeError(
 75            'run_forever() cannot be called from a running event loop')
 76
 77    if not coroutines.iscoroutine(main):
 78        raise ValueError('a coroutine was expected, got {!r}'.format(main))
 79
 80    loop = events.new_event_loop()
 81    try:
 82        events.set_event_loop(loop)
 83        loop.set_debug(debug)
 84        loop.create_task(main)
 85        return loop.run_forever()
 86    finally:
 87        try:
 88            cancel_all_tasks(loop)
 89            loop.run_until_complete(loop.shutdown_asyncgens())
 90        finally:
 91            events.set_event_loop(None)
 92            loop.close()
 93
 94
 95def cancel_all_tasks(loop):
 96    to_cancel = tasks.all_tasks(loop)
 97    if not to_cancel:
 98        return
 99
100    for task in to_cancel:
101        task.cancel()
102
103    loop.run_until_complete(
104        tasks.gather(*to_cancel, loop=loop, return_exceptions=True))
105
106    for task in to_cancel:
107        if task.cancelled():
108            continue
109        if task.exception() is not None:
110            loop.call_exception_handler({
111                'message': 'unhandled exception during run_forever() shutdown',
112                'exception': task.exception(),
113                'task': task,
114            })
def run_forever(main, *, debug=False):
43def run_forever(main, *, debug=False):
44    
45    """Execute the coroutine and all started tasks and return the result.
46
47    This function runs the passed coroutine, taking care of
48    managing the asyncio event loop and finalizing asynchronous
49    generators.
50
51    This function cannot be called when another asyncio event loop is
52    running in the same thread.
53
54    If debug is True, the event loop will be run in debug mode.
55
56    This function always creates a new event loop and closes it at the end.
57    It should be used as a main entry point for asyncio programs, and should
58    ideally only be called once.
59
60    Example:
61        async def my_task():
62            await asyncio.sleep(3)
63            print('will be finished before loop ends unlike with the asyncio.run() call')
64
65
66        async def main():
67            asyncio.create_task(my_task())
68            await asyncio.sleep(1)
69            print('hello')
70
71
72        run_forever(main())
73    """
74    if events._get_running_loop() is not None:
75        raise RuntimeError(
76            'run_forever() cannot be called from a running event loop')
77
78    if not coroutines.iscoroutine(main):
79        raise ValueError('a coroutine was expected, got {!r}'.format(main))
80
81    loop = events.new_event_loop()
82    try:
83        events.set_event_loop(loop)
84        loop.set_debug(debug)
85        loop.create_task(main)
86        return loop.run_forever()
87    finally:
88        try:
89            cancel_all_tasks(loop)
90            loop.run_until_complete(loop.shutdown_asyncgens())
91        finally:
92            events.set_event_loop(None)
93            loop.close()

Execute the coroutine and all started tasks and return the result.

This function runs the passed coroutine, taking care of managing the asyncio event loop and finalizing asynchronous generators.

This function cannot be called when another asyncio event loop is running in the same thread.

If debug is True, the event loop will be run in debug mode.

This function always creates a new event loop and closes it at the end. It should be used as a main entry point for asyncio programs, and should ideally only be called once.

Example: async def my_task(): await asyncio.sleep(3) print('will be finished before loop ends unlike with the asyncio.run() call')

async def main():
    asyncio.create_task(my_task())
    await asyncio.sleep(1)
    print('hello')


run_forever(main())
def cancel_all_tasks(loop):
 96def cancel_all_tasks(loop):
 97    to_cancel = tasks.all_tasks(loop)
 98    if not to_cancel:
 99        return
100
101    for task in to_cancel:
102        task.cancel()
103
104    loop.run_until_complete(
105        tasks.gather(*to_cancel, loop=loop, return_exceptions=True))
106
107    for task in to_cancel:
108        if task.cancelled():
109            continue
110        if task.exception() is not None:
111            loop.call_exception_handler({
112                'message': 'unhandled exception during run_forever() shutdown',
113                'exception': task.exception(),
114                'task': task,
115            })