cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request.versions.v_0.service_with_a_direct_request

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18__all__ = [
 19    'ServiceWithADirectRequestMixin', 
 20    'put_request_to_service',
 21    'try_put_request_to_service',
 22    'put_request_to_service_with_context', 
 23    'try_put_request_to_service_with_context', 
 24    'make_request_to_service_with_context', 
 25    'try_make_request_to_service_with_context', 
 26    'amake_request_to_service_with_context', 
 27    'atry_make_request_to_service_with_context', 
 28    'make_request_to_service', 
 29    'try_make_request_to_service', 
 30    'amake_request_to_service', 
 31    'atry_make_request_to_service'
 32]
 33
 34
 35from cengal.parallel_execution.coroutines.coro_scheduler import *
 36from cengal.introspection.inspect import get_exception
 37from cengal.code_flow_control.smart_values import ValueExistence
 38from typing import Tuple, List, Optional, Any, Union, cast, Type
 39
 40"""
 41Module Docstring
 42Docstrings: http://www.python.org/dev/peps/pep-0257/
 43"""
 44
 45__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 46__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 47__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 48__license__ = "Apache License, Version 2.0"
 49__version__ = "4.4.1"
 50__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 51__email__ = "gtalk@butenkoms.space"
 52# __status__ = "Prototype"
 53__status__ = "Development"
 54# __status__ = "Production"
 55
 56
 57class ServiceWithADirectRequestMixin:
 58    def _add_direct_request(self, *args, **kwargs) -> ValueExistence:
 59        raise NotImplementedError
 60
 61
 62def put_request_to_service_with_context(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
 63    loop, interface, coro_alive = context
 64    if loop is None:
 65        raise CoroSchedulerContextIsNotAvailable
 66
 67    # Outside the loop or in the service
 68    service: ServiceWithADirectRequestMixin = loop.get_service_instance(service_type)
 69    return service._add_direct_request(*args, **kwargs)
 70
 71
 72def try_put_request_to_service_with_context(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
 73    loop, interface, coro_alive = context
 74    if loop is None:
 75        return None
 76
 77    # Outside the loop or in the service
 78    service: ServiceWithADirectRequestMixin = loop.get_service_instance(service_type)
 79    return service._add_direct_request(*args, **kwargs)
 80
 81
 82def make_request_to_service_with_context(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
 83    loop, interface, coro_alive = context
 84    if loop is None:
 85        raise CoroSchedulerContextIsNotAvailable
 86
 87    if coro_alive and isinstance(interface, InterfaceGreenlet):
 88        # In greenlet coroutine
 89        return True, interface(service_type, *args, **kwargs)
 90    else:
 91        # Outside the loop or in the service
 92        service: ServiceWithADirectRequestMixin = loop.get_service_instance(service_type)
 93        return service._add_direct_request(*args, **kwargs)
 94
 95
 96def try_make_request_to_service_with_context(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
 97    loop, interface, coro_alive = context
 98    if loop is None:
 99        return None
100
101    if coro_alive and isinstance(interface, InterfaceGreenlet):
102        # In greenlet coroutine
103        return True, interface(service_type, *args, **kwargs)
104    else:
105        # Outside the loop or in the service
106        service: ServiceWithADirectRequestMixin = loop.get_service_instance(service_type)
107        return service._add_direct_request(*args, **kwargs)
108
109
110async def amake_request_to_service_with_context(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
111    loop, interface, coro_alive = context
112    if loop is None:
113        raise CoroSchedulerContextIsNotAvailable
114
115    if coro_alive and isinstance(interface, InterfaceAsyncAwait):
116        # In awaitable coroutine
117        return True, await interface(service_type, *args, **kwargs)
118    else:
119        # Outside the loop or in the service
120        service: ServiceWithADirectRequestMixin = loop.get_service_instance(service_type)
121        return service._add_direct_request(*args, **kwargs)
122
123
124async def atry_make_request_to_service_with_context(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
125    loop, interface, coro_alive = context
126    if loop is None:
127        return None
128
129    if coro_alive and isinstance(interface, InterfaceAsyncAwait):
130        # In awaitable coroutine
131        return True, await interface(service_type, *args, **kwargs)
132    else:
133        # Outside the loop or in the service
134        service: ServiceWithADirectRequestMixin = loop.get_service_instance(service_type)
135        return service._add_direct_request(*args, **kwargs)
136
137
138def put_request_to_service(service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
139    return put_request_to_service_with_context(get_interface_and_loop_with_backup_loop(), service_type, *args, **kwargs)
140
141
142def try_put_request_to_service(service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
143    return try_put_request_to_service_with_context(get_interface_and_loop_with_backup_loop(), service_type, *args, **kwargs)
144
145
146def make_request_to_service(service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
147    return make_request_to_service_with_context(get_interface_and_loop_with_backup_loop(), service_type, *args, **kwargs)
148
149
150def try_make_request_to_service(service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
151    return try_make_request_to_service_with_context(get_interface_and_loop_with_backup_loop(), service_type, *args, **kwargs)
152
153
154async def amake_request_to_service(service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
155    return await amake_request_to_service_with_context(get_interface_and_loop_with_backup_loop(), service_type, *args, **kwargs)
156
157
158async def atry_make_request_to_service(service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
159    return await atry_make_request_to_service_with_context(get_interface_and_loop_with_backup_loop(), service_type, *args, **kwargs)
class ServiceWithADirectRequestMixin:
58class ServiceWithADirectRequestMixin:
59    def _add_direct_request(self, *args, **kwargs) -> ValueExistence:
60        raise NotImplementedError
def put_request_to_service( service_type: typing.Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence:
139def put_request_to_service(service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
140    return put_request_to_service_with_context(get_interface_and_loop_with_backup_loop(), service_type, *args, **kwargs)
def try_put_request_to_service( service_type: typing.Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence:
143def try_put_request_to_service(service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
144    return try_put_request_to_service_with_context(get_interface_and_loop_with_backup_loop(), service_type, *args, **kwargs)
def put_request_to_service_with_context( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], service_type: typing.Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence:
63def put_request_to_service_with_context(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
64    loop, interface, coro_alive = context
65    if loop is None:
66        raise CoroSchedulerContextIsNotAvailable
67
68    # Outside the loop or in the service
69    service: ServiceWithADirectRequestMixin = loop.get_service_instance(service_type)
70    return service._add_direct_request(*args, **kwargs)
def try_put_request_to_service_with_context( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], service_type: typing.Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence:
73def try_put_request_to_service_with_context(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
74    loop, interface, coro_alive = context
75    if loop is None:
76        return None
77
78    # Outside the loop or in the service
79    service: ServiceWithADirectRequestMixin = loop.get_service_instance(service_type)
80    return service._add_direct_request(*args, **kwargs)
def make_request_to_service_with_context( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], service_type: typing.Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence:
83def make_request_to_service_with_context(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
84    loop, interface, coro_alive = context
85    if loop is None:
86        raise CoroSchedulerContextIsNotAvailable
87
88    if coro_alive and isinstance(interface, InterfaceGreenlet):
89        # In greenlet coroutine
90        return True, interface(service_type, *args, **kwargs)
91    else:
92        # Outside the loop or in the service
93        service: ServiceWithADirectRequestMixin = loop.get_service_instance(service_type)
94        return service._add_direct_request(*args, **kwargs)
def try_make_request_to_service_with_context( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], service_type: typing.Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence:
 97def try_make_request_to_service_with_context(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
 98    loop, interface, coro_alive = context
 99    if loop is None:
100        return None
101
102    if coro_alive and isinstance(interface, InterfaceGreenlet):
103        # In greenlet coroutine
104        return True, interface(service_type, *args, **kwargs)
105    else:
106        # Outside the loop or in the service
107        service: ServiceWithADirectRequestMixin = loop.get_service_instance(service_type)
108        return service._add_direct_request(*args, **kwargs)
async def amake_request_to_service_with_context( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], service_type: typing.Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence:
111async def amake_request_to_service_with_context(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
112    loop, interface, coro_alive = context
113    if loop is None:
114        raise CoroSchedulerContextIsNotAvailable
115
116    if coro_alive and isinstance(interface, InterfaceAsyncAwait):
117        # In awaitable coroutine
118        return True, await interface(service_type, *args, **kwargs)
119    else:
120        # Outside the loop or in the service
121        service: ServiceWithADirectRequestMixin = loop.get_service_instance(service_type)
122        return service._add_direct_request(*args, **kwargs)
async def atry_make_request_to_service_with_context( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], service_type: typing.Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence:
125async def atry_make_request_to_service_with_context(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
126    loop, interface, coro_alive = context
127    if loop is None:
128        return None
129
130    if coro_alive and isinstance(interface, InterfaceAsyncAwait):
131        # In awaitable coroutine
132        return True, await interface(service_type, *args, **kwargs)
133    else:
134        # Outside the loop or in the service
135        service: ServiceWithADirectRequestMixin = loop.get_service_instance(service_type)
136        return service._add_direct_request(*args, **kwargs)
def make_request_to_service( service_type: typing.Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence:
147def make_request_to_service(service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
148    return make_request_to_service_with_context(get_interface_and_loop_with_backup_loop(), service_type, *args, **kwargs)
def try_make_request_to_service( service_type: typing.Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence:
151def try_make_request_to_service(service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
152    return try_make_request_to_service_with_context(get_interface_and_loop_with_backup_loop(), service_type, *args, **kwargs)
async def amake_request_to_service( service_type: typing.Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence:
155async def amake_request_to_service(service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
156    return await amake_request_to_service_with_context(get_interface_and_loop_with_backup_loop(), service_type, *args, **kwargs)
async def atry_make_request_to_service( service_type: typing.Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence:
159async def atry_make_request_to_service(service_type: Type[ServiceWithADirectRequestMixin], *args, **kwargs) -> ValueExistence:
160    return await atry_make_request_to_service_with_context(get_interface_and_loop_with_backup_loop(), service_type, *args, **kwargs)