cengal.parallel_execution.coroutines.coro_standard_services.put_coro_list.versions.v_0.put_coro_list
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19""" 20Module Docstring 21Docstrings: http://www.python.org/dev/peps/pep-0257/ 22""" 23 24 25__author__ = "ButenkoMS <gtalk@butenkoms.space>" 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 28__license__ = "Apache License, Version 2.0" 29__version__ = "4.4.1" 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 31__email__ = "gtalk@butenkoms.space" 32# __status__ = "Prototype" 33__status__ = "Development" 34# __status__ = "Production" 35 36 37__all__ = ['PutSingleCoroParams', 'PSCP', 'PutCoroList', 'put_coro_list_to', 'try_put_coro_list_to', 'aput_coro_list_to', 'atry_put_coro_list_to', 'put_coro_list', 'try_put_coro_list', 'aput_coro_list', 'atry_put_coro_list'] 38 39 40from cengal.parallel_execution.coroutines.coro_scheduler import * 41from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro, Task 42from cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request import * 43from cengal.code_flow_control.smart_values import ValueExistence 44from cengal.introspection.inspect import get_exception, get_exception_tripple 45from cengal.code_flow_control.args_manager import EntityArgsHolder 46from typing import Sequence, Tuple, List, Optional, Any, cast, Dict, Union 47 48 49class PutSingleCoroParams: 50 def __init__(self, coro_worker: AnyWorker, *args, **kwargs) -> None: 51 self.coro_worker: AnyWorker = coro_worker 52 self.args: Tuple = args 53 self.kwargs: Dict = kwargs 54 55 def __call__(self) -> Any: 56 return self.coro_worker, self.args, self.kwargs 57 58 59PSCP = PutSingleCoroParams 60 61 62class PutCoroList(TypedService[List[Tuple[Optional[CoroID], Optional[Exception]]]], ServiceWithADirectRequestMixin): 63 def __init__(self, loop: CoroSchedulerType): 64 super(PutCoroList, self).__init__(loop) 65 self.direct_requests: List[Tuple] = list() 66 67 def single_task_registration_or_immediate_processing( 68 self, coro_list: Sequence[PutSingleCoroParams], tasks: bool = False 69 ) -> Tuple[bool, Union[List[CoroID], List[Task]], Any]: 70 results = list() 71 try: 72 put_coro: PutCoro = self._loop.get_service_instance(PutCoro) 73 caller_coro_id: CoroID = self.current_caller_coro_info.coro_id 74 for request in coro_list: 75 result = None 76 exception = None 77 try: 78 coro_worker, args, kwargs = request() 79 if tasks: 80 result = put_coro.put_task_from_other_service(caller_coro_id, coro_worker, *args, **kwargs) 81 else: 82 coro = put_coro.put_from_other_service(caller_coro_id, coro_worker, *args, **kwargs) 83 result = coro.coro_id 84 except: 85 exception = get_exception() 86 87 results.append((result, exception)) 88 except: 89 return True, results, get_exception() 90 91 return True, results, None 92 93 def full_processing_iteration(self): 94 put_coro: PutCoro = self._loop.get_service_instance(PutCoro) 95 direct_requests_buff = self.direct_requests 96 self.direct_requests = type(direct_requests_buff)() 97 for coro_list in direct_requests_buff: 98 for request in coro_list: 99 coro_worker, args, kwargs = request() 100 try: 101 coro = put_coro.put_root_from_other_service(coro_worker, *args, **kwargs) 102 except: 103 ex_type, exception, tracback = get_exception_tripple() 104 if __debug__: dlog(ex_type, exception, tracback) 105 raise 106 107 self.make_dead() 108 109 def _add_direct_request(self, coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[None]: 110 self.direct_requests.append(coro_list) 111 self.make_live() 112 return (False, None) 113 114 def in_work(self) -> bool: 115 result = bool(self.direct_requests) 116 return self.thrifty_in_work(result) 117 118 119def put_coro_list_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[CoroID]: 120 return make_request_to_service_with_context(context, PutCoroList, coro_list) 121 122 123def try_put_coro_list_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[Optional[CoroID]]: 124 return try_make_request_to_service_with_context(context, PutCoroList, coro_list) 125 126 127async def aput_coro_list_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[CoroID]: 128 return await amake_request_to_service_with_context(context, PutCoroList, coro_list) 129 130 131async def atry_put_coro_list_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[Optional[CoroID]]: 132 return await atry_make_request_to_service_with_context(context, PutCoroList, coro_list) 133 134 135def put_coro_list(coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[CoroID]: 136 return make_request_to_service(PutCoroList, coro_list) 137 138 139def try_put_coro_list(coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[Optional[CoroID]]: 140 return try_make_request_to_service(PutCoroList, coro_list) 141 142 143async def aput_coro_list(coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[CoroID]: 144 return await amake_request_to_service(PutCoroList, coro_list) 145 146 147async def atry_put_coro_list(coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[Optional[CoroID]]: 148 return await atry_make_request_to_service(PutCoroList, coro_list)
50class PutSingleCoroParams: 51 def __init__(self, coro_worker: AnyWorker, *args, **kwargs) -> None: 52 self.coro_worker: AnyWorker = coro_worker 53 self.args: Tuple = args 54 self.kwargs: Dict = kwargs 55 56 def __call__(self) -> Any: 57 return self.coro_worker, self.args, self.kwargs
63class PutCoroList(TypedService[List[Tuple[Optional[CoroID], Optional[Exception]]]], ServiceWithADirectRequestMixin): 64 def __init__(self, loop: CoroSchedulerType): 65 super(PutCoroList, self).__init__(loop) 66 self.direct_requests: List[Tuple] = list() 67 68 def single_task_registration_or_immediate_processing( 69 self, coro_list: Sequence[PutSingleCoroParams], tasks: bool = False 70 ) -> Tuple[bool, Union[List[CoroID], List[Task]], Any]: 71 results = list() 72 try: 73 put_coro: PutCoro = self._loop.get_service_instance(PutCoro) 74 caller_coro_id: CoroID = self.current_caller_coro_info.coro_id 75 for request in coro_list: 76 result = None 77 exception = None 78 try: 79 coro_worker, args, kwargs = request() 80 if tasks: 81 result = put_coro.put_task_from_other_service(caller_coro_id, coro_worker, *args, **kwargs) 82 else: 83 coro = put_coro.put_from_other_service(caller_coro_id, coro_worker, *args, **kwargs) 84 result = coro.coro_id 85 except: 86 exception = get_exception() 87 88 results.append((result, exception)) 89 except: 90 return True, results, get_exception() 91 92 return True, results, None 93 94 def full_processing_iteration(self): 95 put_coro: PutCoro = self._loop.get_service_instance(PutCoro) 96 direct_requests_buff = self.direct_requests 97 self.direct_requests = type(direct_requests_buff)() 98 for coro_list in direct_requests_buff: 99 for request in coro_list: 100 coro_worker, args, kwargs = request() 101 try: 102 coro = put_coro.put_root_from_other_service(coro_worker, *args, **kwargs) 103 except: 104 ex_type, exception, tracback = get_exception_tripple() 105 if __debug__: dlog(ex_type, exception, tracback) 106 raise 107 108 self.make_dead() 109 110 def _add_direct_request(self, coro_list: Sequence[PutSingleCoroParams]) -> ValueExistence[None]: 111 self.direct_requests.append(coro_list) 112 self.make_live() 113 return (False, None) 114 115 def in_work(self) -> bool: 116 result = bool(self.direct_requests) 117 return self.thrifty_in_work(result)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
68 def single_task_registration_or_immediate_processing( 69 self, coro_list: Sequence[PutSingleCoroParams], tasks: bool = False 70 ) -> Tuple[bool, Union[List[CoroID], List[Task]], Any]: 71 results = list() 72 try: 73 put_coro: PutCoro = self._loop.get_service_instance(PutCoro) 74 caller_coro_id: CoroID = self.current_caller_coro_info.coro_id 75 for request in coro_list: 76 result = None 77 exception = None 78 try: 79 coro_worker, args, kwargs = request() 80 if tasks: 81 result = put_coro.put_task_from_other_service(caller_coro_id, coro_worker, *args, **kwargs) 82 else: 83 coro = put_coro.put_from_other_service(caller_coro_id, coro_worker, *args, **kwargs) 84 result = coro.coro_id 85 except: 86 exception = get_exception() 87 88 results.append((result, exception)) 89 except: 90 return True, results, get_exception() 91 92 return True, results, None
94 def full_processing_iteration(self): 95 put_coro: PutCoro = self._loop.get_service_instance(PutCoro) 96 direct_requests_buff = self.direct_requests 97 self.direct_requests = type(direct_requests_buff)() 98 for coro_list in direct_requests_buff: 99 for request in coro_list: 100 coro_worker, args, kwargs = request() 101 try: 102 coro = put_coro.put_root_from_other_service(coro_worker, *args, **kwargs) 103 except: 104 ex_type, exception, tracback = get_exception_tripple() 105 if __debug__: dlog(ex_type, exception, tracback) 106 raise 107 108 self.make_dead()
115 def in_work(self) -> bool: 116 result = bool(self.direct_requests) 117 return self.thrifty_in_work(result)
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- time_left_before_next_event
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
- destroy