cengal.parallel_execution.coroutines.coro_standard_services.put_coro_list.versions.v_0.put_coro_list

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19"""
 20Module Docstring
 21Docstrings: http://www.python.org/dev/peps/pep-0257/
 22"""
 23
 24
 25__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 28__license__ = "Apache License, Version 2.0"
 29__version__ = "4.4.1"
 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 31__email__ = "gtalk@butenkoms.space"
 32# __status__ = "Prototype"
 33__status__ = "Development"
 34# __status__ = "Production"
 35
 36
 37__all__ = ['PutSingleCoroParams', 'PSCP', 'PutCoroList', 'put_coro_list_to', 'try_put_coro_list_to', 'aput_coro_list_to', 'atry_put_coro_list_to', 'put_coro_list', 'try_put_coro_list', 'aput_coro_list', 'atry_put_coro_list']
 38
 39
 40from cengal.parallel_execution.coroutines.coro_scheduler import *
 41from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro, Task
 42from cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request import *
 43from cengal.code_flow_control.smart_values import ValueExistence
 44from cengal.introspection.inspect import get_exception, get_exception_tripple
 45from cengal.code_flow_control.args_manager import EntityArgsHolder
 46from typing import Sequence, Tuple, List, Optional, Any, cast, Dict, Union
 47
 48
 49class PutSingleCoroParams:
 50    def __init__(self, coro_worker: AnyWorker, *args, **kwargs) -> None:
 51        self.coro_worker: AnyWorker = coro_worker
 52        self.args: Tuple = args
 53        self.kwargs: Dict = kwargs
 54    
 55    def __call__(self) -> Any:
 56        return self.coro_worker, self.args, self.kwargs
 57
 58
 59PSCP = PutSingleCoroParams
 60
 61
 62class PutCoroList(TypedService[List[Tuple[Optional[CoroID], Optional[Exception]]]], ServiceWithADirectRequestMixin):
 63    def __init__(self, loop: CoroSchedulerType):
 64        super(PutCoroList, self).__init__(loop)
 65        self.direct_requests: List[Tuple] = list()
 66
 67    def single_task_registration_or_immediate_processing(
 68            self, coro_list: Sequence[PutSingleCoroParams], tasks: bool = False
 69    ) -> Tuple[bool, Union[List[CoroID], List[Task]], Any]:
 70        results = list()
 71        try:
 72            put_coro: PutCoro = self._loop.get_service_instance(PutCoro)
 73            caller_coro_id: CoroID = self.current_caller_coro_info.coro_id
 74            for request in coro_list:
 75                result = None
 76                exception = None
 77                try:
 78                    coro_worker, args, kwargs = request()
 79                    if tasks:
 80                        result = put_coro.put_task_from_other_service(caller_coro_id, coro_worker, *args, **kwargs)
 81                    else:
 82                        coro = put_coro.put_from_other_service(caller_coro_id, coro_worker, *args, **kwargs)
 83                        result = coro.coro_id
 84                except:
 85                    exception = get_exception()
 86
 87                results.append((result, exception))
 88        except:
 89            return True, results, get_exception()
 90
 91        return True, results, None
 92
 93    def full_processing_iteration(self):
 94        put_coro: PutCoro = self._loop.get_service_instance(PutCoro)
 95        direct_requests_buff = self.direct_requests
 96        self.direct_requests = type(direct_requests_buff)()
 97        for coro_list in direct_requests_buff:
 98            for request in coro_list:
 99                coro_worker, args, kwargs = request()
100                try:
101                    coro = put_coro.put_root_from_other_service(coro_worker, *args, **kwargs)
102                except:
103                    ex_type, exception, tracback = get_exception_tripple()
104                    if __debug__: dlog(ex_type, exception, tracback)
105                    raise
106
107        self.make_dead()
108    
109    def _add_direct_request(self, coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[None]:
110        self.direct_requests.append(coro_list)
111        self.make_live()
112        return (False, None)
113
114    def in_work(self) -> bool:
115        result = bool(self.direct_requests)
116        return self.thrifty_in_work(result)
117
118
119def put_coro_list_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[CoroID]:
120    return make_request_to_service_with_context(context, PutCoroList, coro_list)
121
122
123def try_put_coro_list_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[Optional[CoroID]]:
124    return try_make_request_to_service_with_context(context, PutCoroList, coro_list)
125
126
127async def aput_coro_list_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[CoroID]:
128    return await amake_request_to_service_with_context(context, PutCoroList, coro_list)
129
130
131async def atry_put_coro_list_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[Optional[CoroID]]:
132    return await atry_make_request_to_service_with_context(context, PutCoroList, coro_list)
133
134
135def put_coro_list(coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[CoroID]:
136    return make_request_to_service(PutCoroList, coro_list)
137
138
139def try_put_coro_list(coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[Optional[CoroID]]:
140    return try_make_request_to_service(PutCoroList, coro_list)
141
142
143async def aput_coro_list(coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[CoroID]:
144    return await amake_request_to_service(PutCoroList, coro_list)
145
146
147async def atry_put_coro_list(coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[Optional[CoroID]]:
148    return await atry_make_request_to_service(PutCoroList, coro_list)
class PutSingleCoroParams:
50class PutSingleCoroParams:
51    def __init__(self, coro_worker: AnyWorker, *args, **kwargs) -> None:
52        self.coro_worker: AnyWorker = coro_worker
53        self.args: Tuple = args
54        self.kwargs: Dict = kwargs
55    
56    def __call__(self) -> Any:
57        return self.coro_worker, self.args, self.kwargs
PutSingleCoroParams( coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs)
51    def __init__(self, coro_worker: AnyWorker, *args, **kwargs) -> None:
52        self.coro_worker: AnyWorker = coro_worker
53        self.args: Tuple = args
54        self.kwargs: Dict = kwargs
coro_worker: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, Awaitable[Any]]]
args: Tuple
kwargs: Dict
PSCP = <class 'PutSingleCoroParams'>
class PutCoroList(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.TypedService[typing.List[typing.Tuple[typing.Union[int, NoneType], typing.Union[Exception, NoneType]]]], cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request.versions.v_0.service_with_a_direct_request.ServiceWithADirectRequestMixin):
 63class PutCoroList(TypedService[List[Tuple[Optional[CoroID], Optional[Exception]]]], ServiceWithADirectRequestMixin):
 64    def __init__(self, loop: CoroSchedulerType):
 65        super(PutCoroList, self).__init__(loop)
 66        self.direct_requests: List[Tuple] = list()
 67
 68    def single_task_registration_or_immediate_processing(
 69            self, coro_list: Sequence[PutSingleCoroParams], tasks: bool = False
 70    ) -> Tuple[bool, Union[List[CoroID], List[Task]], Any]:
 71        results = list()
 72        try:
 73            put_coro: PutCoro = self._loop.get_service_instance(PutCoro)
 74            caller_coro_id: CoroID = self.current_caller_coro_info.coro_id
 75            for request in coro_list:
 76                result = None
 77                exception = None
 78                try:
 79                    coro_worker, args, kwargs = request()
 80                    if tasks:
 81                        result = put_coro.put_task_from_other_service(caller_coro_id, coro_worker, *args, **kwargs)
 82                    else:
 83                        coro = put_coro.put_from_other_service(caller_coro_id, coro_worker, *args, **kwargs)
 84                        result = coro.coro_id
 85                except:
 86                    exception = get_exception()
 87
 88                results.append((result, exception))
 89        except:
 90            return True, results, get_exception()
 91
 92        return True, results, None
 93
 94    def full_processing_iteration(self):
 95        put_coro: PutCoro = self._loop.get_service_instance(PutCoro)
 96        direct_requests_buff = self.direct_requests
 97        self.direct_requests = type(direct_requests_buff)()
 98        for coro_list in direct_requests_buff:
 99            for request in coro_list:
100                coro_worker, args, kwargs = request()
101                try:
102                    coro = put_coro.put_root_from_other_service(coro_worker, *args, **kwargs)
103                except:
104                    ex_type, exception, tracback = get_exception_tripple()
105                    if __debug__: dlog(ex_type, exception, tracback)
106                    raise
107
108        self.make_dead()
109    
110    def _add_direct_request(self, coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[None]:
111        self.direct_requests.append(coro_list)
112        self.make_live()
113        return (False, None)
114
115    def in_work(self) -> bool:
116        result = bool(self.direct_requests)
117        return self.thrifty_in_work(result)

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

PutCoroList( loop: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
64    def __init__(self, loop: CoroSchedulerType):
65        super(PutCoroList, self).__init__(loop)
66        self.direct_requests: List[Tuple] = list()
direct_requests: List[Tuple]
def single_task_registration_or_immediate_processing( self, coro_list: typing.Sequence[PutSingleCoroParams], tasks: bool = False) -> Tuple[bool, Union[List[int], List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], Any]:
68    def single_task_registration_or_immediate_processing(
69            self, coro_list: Sequence[PutSingleCoroParams], tasks: bool = False
70    ) -> Tuple[bool, Union[List[CoroID], List[Task]], Any]:
71        results = list()
72        try:
73            put_coro: PutCoro = self._loop.get_service_instance(PutCoro)
74            caller_coro_id: CoroID = self.current_caller_coro_info.coro_id
75            for request in coro_list:
76                result = None
77                exception = None
78                try:
79                    coro_worker, args, kwargs = request()
80                    if tasks:
81                        result = put_coro.put_task_from_other_service(caller_coro_id, coro_worker, *args, **kwargs)
82                    else:
83                        coro = put_coro.put_from_other_service(caller_coro_id, coro_worker, *args, **kwargs)
84                        result = coro.coro_id
85                except:
86                    exception = get_exception()
87
88                results.append((result, exception))
89        except:
90            return True, results, get_exception()
91
92        return True, results, None
def full_processing_iteration(self):
 94    def full_processing_iteration(self):
 95        put_coro: PutCoro = self._loop.get_service_instance(PutCoro)
 96        direct_requests_buff = self.direct_requests
 97        self.direct_requests = type(direct_requests_buff)()
 98        for coro_list in direct_requests_buff:
 99            for request in coro_list:
100                coro_worker, args, kwargs = request()
101                try:
102                    coro = put_coro.put_root_from_other_service(coro_worker, *args, **kwargs)
103                except:
104                    ex_type, exception, tracback = get_exception_tripple()
105                    if __debug__: dlog(ex_type, exception, tracback)
106                    raise
107
108        self.make_dead()
def in_work(self) -> bool:
115    def in_work(self) -> bool:
116        result = bool(self.direct_requests)
117        return self.thrifty_in_work(result)

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
in_forground_work
thrifty_in_work
time_left_before_next_event
is_low_latency
make_live
make_dead
service_id_impl
service_id
destroy
def put_coro_list_to( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], coro_list: typing.Sequence[PutSingleCoroParams]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[int]:
120def put_coro_list_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[CoroID]:
121    return make_request_to_service_with_context(context, PutCoroList, coro_list)
def try_put_coro_list_to( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], coro_list: typing.Sequence[PutSingleCoroParams]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[int, NoneType]]:
124def try_put_coro_list_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[Optional[CoroID]]:
125    return try_make_request_to_service_with_context(context, PutCoroList, coro_list)
async def aput_coro_list_to( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], coro_list: typing.Sequence[PutSingleCoroParams]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[int]:
128async def aput_coro_list_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[CoroID]:
129    return await amake_request_to_service_with_context(context, PutCoroList, coro_list)
async def atry_put_coro_list_to( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], coro_list: typing.Sequence[PutSingleCoroParams]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[int, NoneType]]:
132async def atry_put_coro_list_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[Optional[CoroID]]:
133    return await atry_make_request_to_service_with_context(context, PutCoroList, coro_list)
def put_coro_list( coro_list: typing.Sequence[PutSingleCoroParams]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[int]:
136def put_coro_list(coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[CoroID]:
137    return make_request_to_service(PutCoroList, coro_list)
def try_put_coro_list( coro_list: typing.Sequence[PutSingleCoroParams]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[int, NoneType]]:
140def try_put_coro_list(coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[Optional[CoroID]]:
141    return try_make_request_to_service(PutCoroList, coro_list)
async def aput_coro_list( coro_list: typing.Sequence[PutSingleCoroParams]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[int]:
144async def aput_coro_list(coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[CoroID]:
145    return await amake_request_to_service(PutCoroList, coro_list)
async def atry_put_coro_list( coro_list: typing.Sequence[PutSingleCoroParams]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[int, NoneType]]:
148async def atry_put_coro_list(coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[Optional[CoroID]]:
149    return await atry_make_request_to_service(PutCoroList, coro_list)