cengal.parallel_execution.coroutines.coro_standard_services.throw_coro_list.versions.v_0.throw_coro_list

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19"""
 20Module Docstring
 21Docstrings: http://www.python.org/dev/peps/pep-0257/
 22"""
 23
 24
 25__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 28__license__ = "Apache License, Version 2.0"
 29__version__ = "4.4.1"
 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 31__email__ = "gtalk@butenkoms.space"
 32# __status__ = "Prototype"
 33__status__ = "Development"
 34# __status__ = "Production"
 35
 36
 37__all__ = ['ThrowSingleCoroParams', 'ThrowCoroList', 'throw_coro_list_on', 'try_throw_coro_list_on', 'athrow_coro_list_on', 'atry_throw_coro_list_on', 'throw_coro_list', 'try_throw_coro_list', 'athrow_coro_list', 'atry_throw_coro_list']
 38
 39
 40from cengal.parallel_execution.coroutines.coro_scheduler import *
 41from cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request import *
 42from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro
 43from cengal.code_flow_control.smart_values import ValueExistence
 44from cengal.introspection.inspect import get_exception, get_exception_tripple
 45from typing import Sequence, Tuple, List, Optional, Any, Type, cast, Dict, Union, Set
 46
 47
 48class ThrowSingleCoroParams:
 49    def __init__(self, coro_id: CoroID, ex_type: Type[Exception], ex_value: Exception = None, ex_traceback: Any = None, tree: bool = False) -> None:
 50        self.coro_id: CoroID = coro_id
 51        self.ex_type: Type[Exception] = ex_type
 52        self.ex_value: Exception = ex_value
 53        self.ex_traceback: Any = ex_traceback
 54        self.tree: bool = tree
 55    
 56    def __call__(self, new_coro_id: Optional[CoroID] = None) -> Tuple:
 57        return self.coro_id if new_coro_id is None else new_coro_id, self.ex_type, self.ex_value, self.ex_traceback
 58    
 59    def __bool__(self):
 60        return self.tree
 61
 62
 63class ThrowCoroList(TypedService[List[Tuple[Optional[bool], Optional[Exception]]]], ServiceWithADirectRequestMixin):
 64    def __init__(self, loop: CoroSchedulerType):
 65        super(ThrowCoroList, self).__init__(loop)
 66        self.direct_requests: List[Tuple] = list()
 67
 68    def single_task_registration_or_immediate_processing(
 69            self, coro_list: Sequence[ThrowSingleCoroParams]
 70    ) -> Tuple[bool, Sequence[Tuple[bool, Optional[Exception]]], Any]:
 71        results = list()
 72        try:
 73            put_coro: PutCoro = self._loop.get_service_instance(PutCoro)
 74            for request in coro_list:
 75                result = None
 76                exception = None
 77                try:
 78                    if request:
 79                        children: Set[CoroID] = put_coro.get_set_of_all_children(request.coro_id)
 80                    
 81                    result = self._loop.throw_coro_by_id(*request())
 82                    if request:
 83                        for child_coro_id in children:
 84                            self._loop.throw_coro_by_id(*request(child_coro_id))
 85                except:
 86                    exception = get_exception()
 87
 88                results.append((result, exception))
 89        except:
 90            return True, results, get_exception()
 91
 92        return True, results, None
 93
 94    def full_processing_iteration(self):
 95        put_coro: PutCoro = self._loop.get_service_instance(PutCoro)
 96        direct_requests_buff = self.direct_requests
 97        self.direct_requests = type(direct_requests_buff)()
 98        for coro_list in direct_requests_buff:
 99            for request in coro_list:
100                try:
101                    if request:
102                        children: Set[CoroID] = put_coro.get_set_of_all_children(request.coro_id)
103
104                    self._loop.throw_coro_by_id(*request())
105                    if request:
106                        for child_coro_id in children:
107                            self._loop.throw_coro_by_id(*request(child_coro_id))
108                except:
109                    ex_type, exception, tracback = get_exception_tripple()
110                    if __debug__: dlog(ex_type, exception, tracback)
111                    raise
112
113        self.make_dead()
114    
115    def _add_direct_request(self, coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[None]:
116        self.direct_requests.append(coro_list)
117        self.make_live()
118        return (False, None)
119
120    def in_work(self) -> bool:
121        result: bool = bool(self.direct_requests)
122        return self.thrifty_in_work(result)
123
124
125def throw_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]:
126    return make_request_to_service_with_context(context, ThrowCoroList, coro_list)
127
128
129def try_throw_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]:
130    return try_make_request_to_service_with_context(context, ThrowCoroList, coro_list)
131
132
133async def athrow_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]:
134    return await amake_request_to_service_with_context(context, ThrowCoroList, coro_list)
135
136
137async def atry_throw_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]:
138    return await atry_make_request_to_service_with_context(context, ThrowCoroList, coro_list)
139
140
141def throw_coro_list(coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]:
142    return make_request_to_service(ThrowCoroList, coro_list)
143
144
145def try_throw_coro_list(coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]:
146    return try_make_request_to_service(ThrowCoroList, coro_list)
147
148
149async def athrow_coro_list(coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]:
150    return await amake_request_to_service(ThrowCoroList, coro_list)
151
152
153async def atry_throw_coro_list(coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]:
154    return await atry_make_request_to_service(ThrowCoroList, coro_list)
class ThrowSingleCoroParams:
49class ThrowSingleCoroParams:
50    def __init__(self, coro_id: CoroID, ex_type: Type[Exception], ex_value: Exception = None, ex_traceback: Any = None, tree: bool = False) -> None:
51        self.coro_id: CoroID = coro_id
52        self.ex_type: Type[Exception] = ex_type
53        self.ex_value: Exception = ex_value
54        self.ex_traceback: Any = ex_traceback
55        self.tree: bool = tree
56    
57    def __call__(self, new_coro_id: Optional[CoroID] = None) -> Tuple:
58        return self.coro_id if new_coro_id is None else new_coro_id, self.ex_type, self.ex_value, self.ex_traceback
59    
60    def __bool__(self):
61        return self.tree
ThrowSingleCoroParams( coro_id: int, ex_type: typing.Type[Exception], ex_value: Exception = None, ex_traceback: typing.Any = None, tree: bool = False)
50    def __init__(self, coro_id: CoroID, ex_type: Type[Exception], ex_value: Exception = None, ex_traceback: Any = None, tree: bool = False) -> None:
51        self.coro_id: CoroID = coro_id
52        self.ex_type: Type[Exception] = ex_type
53        self.ex_value: Exception = ex_value
54        self.ex_traceback: Any = ex_traceback
55        self.tree: bool = tree
coro_id: int
ex_type: Type[Exception]
ex_value: Exception
ex_traceback: Any
tree: bool
class ThrowCoroList(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.TypedService[typing.List[typing.Tuple[typing.Union[bool, NoneType], typing.Union[Exception, NoneType]]]], cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request.versions.v_0.service_with_a_direct_request.ServiceWithADirectRequestMixin):
 64class ThrowCoroList(TypedService[List[Tuple[Optional[bool], Optional[Exception]]]], ServiceWithADirectRequestMixin):
 65    def __init__(self, loop: CoroSchedulerType):
 66        super(ThrowCoroList, self).__init__(loop)
 67        self.direct_requests: List[Tuple] = list()
 68
 69    def single_task_registration_or_immediate_processing(
 70            self, coro_list: Sequence[ThrowSingleCoroParams]
 71    ) -> Tuple[bool, Sequence[Tuple[bool, Optional[Exception]]], Any]:
 72        results = list()
 73        try:
 74            put_coro: PutCoro = self._loop.get_service_instance(PutCoro)
 75            for request in coro_list:
 76                result = None
 77                exception = None
 78                try:
 79                    if request:
 80                        children: Set[CoroID] = put_coro.get_set_of_all_children(request.coro_id)
 81                    
 82                    result = self._loop.throw_coro_by_id(*request())
 83                    if request:
 84                        for child_coro_id in children:
 85                            self._loop.throw_coro_by_id(*request(child_coro_id))
 86                except:
 87                    exception = get_exception()
 88
 89                results.append((result, exception))
 90        except:
 91            return True, results, get_exception()
 92
 93        return True, results, None
 94
 95    def full_processing_iteration(self):
 96        put_coro: PutCoro = self._loop.get_service_instance(PutCoro)
 97        direct_requests_buff = self.direct_requests
 98        self.direct_requests = type(direct_requests_buff)()
 99        for coro_list in direct_requests_buff:
100            for request in coro_list:
101                try:
102                    if request:
103                        children: Set[CoroID] = put_coro.get_set_of_all_children(request.coro_id)
104
105                    self._loop.throw_coro_by_id(*request())
106                    if request:
107                        for child_coro_id in children:
108                            self._loop.throw_coro_by_id(*request(child_coro_id))
109                except:
110                    ex_type, exception, tracback = get_exception_tripple()
111                    if __debug__: dlog(ex_type, exception, tracback)
112                    raise
113
114        self.make_dead()
115    
116    def _add_direct_request(self, coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[None]:
117        self.direct_requests.append(coro_list)
118        self.make_live()
119        return (False, None)
120
121    def in_work(self) -> bool:
122        result: bool = bool(self.direct_requests)
123        return self.thrifty_in_work(result)

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

ThrowCoroList( loop: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
65    def __init__(self, loop: CoroSchedulerType):
66        super(ThrowCoroList, self).__init__(loop)
67        self.direct_requests: List[Tuple] = list()
direct_requests: List[Tuple]
def single_task_registration_or_immediate_processing( self, coro_list: typing.Sequence[ThrowSingleCoroParams]) -> Tuple[bool, Sequence[Tuple[bool, Union[Exception, NoneType]]], Any]:
69    def single_task_registration_or_immediate_processing(
70            self, coro_list: Sequence[ThrowSingleCoroParams]
71    ) -> Tuple[bool, Sequence[Tuple[bool, Optional[Exception]]], Any]:
72        results = list()
73        try:
74            put_coro: PutCoro = self._loop.get_service_instance(PutCoro)
75            for request in coro_list:
76                result = None
77                exception = None
78                try:
79                    if request:
80                        children: Set[CoroID] = put_coro.get_set_of_all_children(request.coro_id)
81                    
82                    result = self._loop.throw_coro_by_id(*request())
83                    if request:
84                        for child_coro_id in children:
85                            self._loop.throw_coro_by_id(*request(child_coro_id))
86                except:
87                    exception = get_exception()
88
89                results.append((result, exception))
90        except:
91            return True, results, get_exception()
92
93        return True, results, None
def full_processing_iteration(self):
 95    def full_processing_iteration(self):
 96        put_coro: PutCoro = self._loop.get_service_instance(PutCoro)
 97        direct_requests_buff = self.direct_requests
 98        self.direct_requests = type(direct_requests_buff)()
 99        for coro_list in direct_requests_buff:
100            for request in coro_list:
101                try:
102                    if request:
103                        children: Set[CoroID] = put_coro.get_set_of_all_children(request.coro_id)
104
105                    self._loop.throw_coro_by_id(*request())
106                    if request:
107                        for child_coro_id in children:
108                            self._loop.throw_coro_by_id(*request(child_coro_id))
109                except:
110                    ex_type, exception, tracback = get_exception_tripple()
111                    if __debug__: dlog(ex_type, exception, tracback)
112                    raise
113
114        self.make_dead()
def in_work(self) -> bool:
121    def in_work(self) -> bool:
122        result: bool = bool(self.direct_requests)
123        return self.thrifty_in_work(result)

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
in_forground_work
thrifty_in_work
time_left_before_next_event
is_low_latency
make_live
make_dead
service_id_impl
service_id
destroy
def throw_coro_list_on( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], coro_list: typing.Sequence[ThrowSingleCoroParams]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Sequence[typing.Tuple[bool, typing.Union[Exception, NoneType]]]]:
126def throw_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]:
127    return make_request_to_service_with_context(context, ThrowCoroList, coro_list)
def try_throw_coro_list_on( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], coro_list: typing.Sequence[ThrowSingleCoroParams]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[typing.Sequence[typing.Tuple[bool, typing.Union[Exception, NoneType]]], NoneType]]:
130def try_throw_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]:
131    return try_make_request_to_service_with_context(context, ThrowCoroList, coro_list)
async def athrow_coro_list_on( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], coro_list: typing.Sequence[ThrowSingleCoroParams]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Sequence[typing.Tuple[bool, typing.Union[Exception, NoneType]]]]:
134async def athrow_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]:
135    return await amake_request_to_service_with_context(context, ThrowCoroList, coro_list)
async def atry_throw_coro_list_on( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], coro_list: typing.Sequence[ThrowSingleCoroParams]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[typing.Sequence[typing.Tuple[bool, typing.Union[Exception, NoneType]]], NoneType]]:
138async def atry_throw_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]:
139    return await atry_make_request_to_service_with_context(context, ThrowCoroList, coro_list)
def throw_coro_list( coro_list: typing.Sequence[ThrowSingleCoroParams]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Sequence[typing.Tuple[bool, typing.Union[Exception, NoneType]]]]:
142def throw_coro_list(coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]:
143    return make_request_to_service(ThrowCoroList, coro_list)
def try_throw_coro_list( coro_list: typing.Sequence[ThrowSingleCoroParams]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[typing.Sequence[typing.Tuple[bool, typing.Union[Exception, NoneType]]], NoneType]]:
146def try_throw_coro_list(coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]:
147    return try_make_request_to_service(ThrowCoroList, coro_list)
async def athrow_coro_list( coro_list: typing.Sequence[ThrowSingleCoroParams]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Sequence[typing.Tuple[bool, typing.Union[Exception, NoneType]]]]:
150async def athrow_coro_list(coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]:
151    return await amake_request_to_service(ThrowCoroList, coro_list)
async def atry_throw_coro_list( coro_list: typing.Sequence[ThrowSingleCoroParams]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[typing.Sequence[typing.Tuple[bool, typing.Union[Exception, NoneType]]], NoneType]]:
154async def atry_throw_coro_list(coro_list: Sequence[ThrowSingleCoroParams]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]:
155    return await atry_make_request_to_service(ThrowCoroList, coro_list)