cengal.parallel_execution.asyncio.atasks.versions.v_0.atasks

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

 1#!/usr/bin/env python
 2# coding=utf-8
 3
 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
 5# 
 6# Licensed under the Apache License, Version 2.0 (the "License");
 7# you may not use this file except in compliance with the License.
 8# You may obtain a copy of the License at
 9# 
10#     http://www.apache.org/licenses/LICENSE-2.0
11# 
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18"""
19Module Docstring
20Docstrings: http://www.python.org/dev/peps/pep-0257/
21"""
22
23__author__ = "ButenkoMS <gtalk@butenkoms.space>"
24__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
25__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
26__license__ = "Apache License, Version 2.0"
27__version__ = "4.4.1"
28__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
29__email__ = "gtalk@butenkoms.space"
30# __status__ = "Prototype"
31__status__ = "Development"
32# __status__ = "Production"
33__all__ = ['create_task_awaitable', 'create_task']
34
35
36import asyncio
37from cengal.system import PYTHON_VERSION_INT
38
39
40def create_task_awaitable(asyncio_awaitable) -> asyncio.Task:
41    if (3, 7) <= PYTHON_VERSION_INT:
42        return asyncio.create_task(asyncio_awaitable)
43    else:
44        return asyncio.get_event_loop().create_task(asyncio_awaitable)
45
46
47def create_task(asyncio_coroutine, *args, **kwargs) -> asyncio.Task:
48    if (3, 7) <= PYTHON_VERSION_INT:
49        return asyncio.create_task(asyncio_coroutine(*args, **kwargs))
50    else:
51        return asyncio.get_event_loop().create_task(asyncio_coroutine(*args, **kwargs))
def create_task_awaitable(asyncio_awaitable) -> _asyncio.Task:
41def create_task_awaitable(asyncio_awaitable) -> asyncio.Task:
42    if (3, 7) <= PYTHON_VERSION_INT:
43        return asyncio.create_task(asyncio_awaitable)
44    else:
45        return asyncio.get_event_loop().create_task(asyncio_awaitable)
def create_task(asyncio_coroutine, *args, **kwargs) -> _asyncio.Task:
48def create_task(asyncio_coroutine, *args, **kwargs) -> asyncio.Task:
49    if (3, 7) <= PYTHON_VERSION_INT:
50        return asyncio.create_task(asyncio_coroutine(*args, **kwargs))
51    else:
52        return asyncio.get_event_loop().create_task(asyncio_coroutine(*args, **kwargs))