cengal.parallel_execution.coroutines.coro_standard_services.kill_coro_list.versions.v_0.kill_coro_list

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19"""
 20Module Docstring
 21Docstrings: http://www.python.org/dev/peps/pep-0257/
 22"""
 23
 24
 25__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 28__license__ = "Apache License, Version 2.0"
 29__version__ = "4.4.1"
 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 31__email__ = "gtalk@butenkoms.space"
 32# __status__ = "Prototype"
 33__status__ = "Development"
 34# __status__ = "Production"
 35
 36
 37__all__ = ['KillSingleCoroParams', 'KillCoroList', 'kill_coro_list_on', 'try_kill_coro_list_on', 'akill_coro_list_on', 'atry_kill_coro_list_on', 'kill_coro_list', 'try_kill_coro_list', 'akill_coro_list', 'atry_kill_coro_list']
 38
 39
 40from cengal.parallel_execution.coroutines.coro_scheduler import *
 41from cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request import *
 42from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro
 43from cengal.code_flow_control.smart_values import ValueExistence
 44from cengal.introspection.inspect import get_exception, get_exception_tripple
 45from typing import Sequence, Tuple, List, Optional, Any, cast, Dict, Union, Set
 46
 47
 48class KillSingleCoroParams:
 49    def __init__(self, coro_id: CoroID, tree: bool = False) -> None:
 50        self.coro_id: CoroID = coro_id
 51        self.tree: bool = tree
 52    
 53    def __call__(self, new_coro_id: Optional[CoroID] = None) -> Tuple:
 54        return self.coro_id if new_coro_id is None else new_coro_id
 55    
 56    def __bool__(self):
 57        return self.tree
 58
 59
 60class KillCoroList(TypedService[List[Tuple[Optional[bool], Optional[Exception]]]], ServiceWithADirectRequestMixin):
 61    def __init__(self, loop: CoroSchedulerType):
 62        super(KillCoroList, self).__init__(loop)
 63        self.direct_requests: List[Tuple] = list()
 64
 65    def single_task_registration_or_immediate_processing(
 66            self, coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]
 67    ) -> Tuple[bool, Sequence[Tuple[bool, Optional[Exception]]], Any]:
 68        results = list()
 69        try:
 70            put_coro: PutCoro = self._loop.get_service_instance(PutCoro)
 71            for request in coro_list:
 72                exception = None
 73                result = None
 74                try:
 75                    if isinstance(request, KillSingleCoroParams):
 76                        coro_id = request.coro_id
 77                        if request:
 78                            children: Set[CoroID] = put_coro.get_set_of_all_children(coro_id)
 79                        
 80                        result = self._loop.kill_coro_by_id(coro_id)
 81                        if request:
 82                            for child in children:
 83                                self._loop.kill_coro_by_id(child)
 84                    else:
 85                        coro_id = request
 86                        result = self._loop.kill_coro_by_id(coro_id)
 87                except:
 88                    exception = get_exception()
 89                
 90                results.append((result, exception))
 91        except:
 92            return True, results, get_exception()
 93
 94        return True, results, None
 95
 96    def full_processing_iteration(self):
 97        put_coro: PutCoro = self._loop.get_service_instance(PutCoro)
 98        direct_requests_buff = self.direct_requests
 99        self.direct_requests = type(direct_requests_buff)()
100        for coro_list in direct_requests_buff:
101            for request in coro_list:
102                exception = None
103                try:
104                    if isinstance(request, KillSingleCoroParams):
105                        coro_id = request.coro_id
106                        if request:
107                            children: Set[CoroID] = put_coro.get_set_of_all_children(coro_id)
108                        
109                        self._loop.kill_coro_by_id(coro_id)
110                        if request:
111                            for child in children:
112                                self._loop.kill_coro_by_id(child)
113                    else:
114                        coro_id = request
115                        self._loop.kill_coro_by_id(coro_id)
116                except:
117                    ex_type, exception, tracback = get_exception_tripple()
118                    if __debug__: dlog(ex_type, exception, tracback)
119                    raise
120
121        self.make_dead()
122    
123    def _add_direct_request(self, coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[None]:
124        self.direct_requests.append(coro_list)
125        self.make_live()
126        return (False, None)
127
128    def in_work(self) -> bool:
129        result: bool = bool(self.direct_requests)
130        return self.thrifty_in_work(result)
131
132
133def kill_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]:
134    return make_request_to_service_with_context(context, KillCoroList, coro_list)
135
136
137def try_kill_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]:
138    return try_make_request_to_service_with_context(context, KillCoroList, coro_list)
139
140
141async def akill_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]:
142    return await amake_request_to_service_with_context(context, KillCoroList, coro_list)
143
144
145async def atry_kill_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]:
146    return await atry_make_request_to_service_with_context(context, KillCoroList, coro_list)
147
148
149def kill_coro_list(coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]:
150    return make_request_to_service(KillCoroList, coro_list)
151
152
153def try_kill_coro_list(coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]:
154    return try_make_request_to_service(KillCoroList, coro_list)
155
156
157async def akill_coro_list(coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]:
158    return await amake_request_to_service(KillCoroList, coro_list)
159
160
161async def atry_kill_coro_list(coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]:
162    return await atry_make_request_to_service(KillCoroList, coro_list)
class KillSingleCoroParams:
49class KillSingleCoroParams:
50    def __init__(self, coro_id: CoroID, tree: bool = False) -> None:
51        self.coro_id: CoroID = coro_id
52        self.tree: bool = tree
53    
54    def __call__(self, new_coro_id: Optional[CoroID] = None) -> Tuple:
55        return self.coro_id if new_coro_id is None else new_coro_id
56    
57    def __bool__(self):
58        return self.tree
KillSingleCoroParams(coro_id: int, tree: bool = False)
50    def __init__(self, coro_id: CoroID, tree: bool = False) -> None:
51        self.coro_id: CoroID = coro_id
52        self.tree: bool = tree
coro_id: int
tree: bool
class KillCoroList(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.TypedService[typing.List[typing.Tuple[typing.Union[bool, NoneType], typing.Union[Exception, NoneType]]]], cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request.versions.v_0.service_with_a_direct_request.ServiceWithADirectRequestMixin):
 61class KillCoroList(TypedService[List[Tuple[Optional[bool], Optional[Exception]]]], ServiceWithADirectRequestMixin):
 62    def __init__(self, loop: CoroSchedulerType):
 63        super(KillCoroList, self).__init__(loop)
 64        self.direct_requests: List[Tuple] = list()
 65
 66    def single_task_registration_or_immediate_processing(
 67            self, coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]
 68    ) -> Tuple[bool, Sequence[Tuple[bool, Optional[Exception]]], Any]:
 69        results = list()
 70        try:
 71            put_coro: PutCoro = self._loop.get_service_instance(PutCoro)
 72            for request in coro_list:
 73                exception = None
 74                result = None
 75                try:
 76                    if isinstance(request, KillSingleCoroParams):
 77                        coro_id = request.coro_id
 78                        if request:
 79                            children: Set[CoroID] = put_coro.get_set_of_all_children(coro_id)
 80                        
 81                        result = self._loop.kill_coro_by_id(coro_id)
 82                        if request:
 83                            for child in children:
 84                                self._loop.kill_coro_by_id(child)
 85                    else:
 86                        coro_id = request
 87                        result = self._loop.kill_coro_by_id(coro_id)
 88                except:
 89                    exception = get_exception()
 90                
 91                results.append((result, exception))
 92        except:
 93            return True, results, get_exception()
 94
 95        return True, results, None
 96
 97    def full_processing_iteration(self):
 98        put_coro: PutCoro = self._loop.get_service_instance(PutCoro)
 99        direct_requests_buff = self.direct_requests
100        self.direct_requests = type(direct_requests_buff)()
101        for coro_list in direct_requests_buff:
102            for request in coro_list:
103                exception = None
104                try:
105                    if isinstance(request, KillSingleCoroParams):
106                        coro_id = request.coro_id
107                        if request:
108                            children: Set[CoroID] = put_coro.get_set_of_all_children(coro_id)
109                        
110                        self._loop.kill_coro_by_id(coro_id)
111                        if request:
112                            for child in children:
113                                self._loop.kill_coro_by_id(child)
114                    else:
115                        coro_id = request
116                        self._loop.kill_coro_by_id(coro_id)
117                except:
118                    ex_type, exception, tracback = get_exception_tripple()
119                    if __debug__: dlog(ex_type, exception, tracback)
120                    raise
121
122        self.make_dead()
123    
124    def _add_direct_request(self, coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[None]:
125        self.direct_requests.append(coro_list)
126        self.make_live()
127        return (False, None)
128
129    def in_work(self) -> bool:
130        result: bool = bool(self.direct_requests)
131        return self.thrifty_in_work(result)

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

KillCoroList( loop: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
62    def __init__(self, loop: CoroSchedulerType):
63        super(KillCoroList, self).__init__(loop)
64        self.direct_requests: List[Tuple] = list()
direct_requests: List[Tuple]
def single_task_registration_or_immediate_processing( self, coro_list: typing.Sequence[typing.Union[int, KillSingleCoroParams]]) -> Tuple[bool, Sequence[Tuple[bool, Union[Exception, NoneType]]], Any]:
66    def single_task_registration_or_immediate_processing(
67            self, coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]
68    ) -> Tuple[bool, Sequence[Tuple[bool, Optional[Exception]]], Any]:
69        results = list()
70        try:
71            put_coro: PutCoro = self._loop.get_service_instance(PutCoro)
72            for request in coro_list:
73                exception = None
74                result = None
75                try:
76                    if isinstance(request, KillSingleCoroParams):
77                        coro_id = request.coro_id
78                        if request:
79                            children: Set[CoroID] = put_coro.get_set_of_all_children(coro_id)
80                        
81                        result = self._loop.kill_coro_by_id(coro_id)
82                        if request:
83                            for child in children:
84                                self._loop.kill_coro_by_id(child)
85                    else:
86                        coro_id = request
87                        result = self._loop.kill_coro_by_id(coro_id)
88                except:
89                    exception = get_exception()
90                
91                results.append((result, exception))
92        except:
93            return True, results, get_exception()
94
95        return True, results, None
def full_processing_iteration(self):
 97    def full_processing_iteration(self):
 98        put_coro: PutCoro = self._loop.get_service_instance(PutCoro)
 99        direct_requests_buff = self.direct_requests
100        self.direct_requests = type(direct_requests_buff)()
101        for coro_list in direct_requests_buff:
102            for request in coro_list:
103                exception = None
104                try:
105                    if isinstance(request, KillSingleCoroParams):
106                        coro_id = request.coro_id
107                        if request:
108                            children: Set[CoroID] = put_coro.get_set_of_all_children(coro_id)
109                        
110                        self._loop.kill_coro_by_id(coro_id)
111                        if request:
112                            for child in children:
113                                self._loop.kill_coro_by_id(child)
114                    else:
115                        coro_id = request
116                        self._loop.kill_coro_by_id(coro_id)
117                except:
118                    ex_type, exception, tracback = get_exception_tripple()
119                    if __debug__: dlog(ex_type, exception, tracback)
120                    raise
121
122        self.make_dead()
def in_work(self) -> bool:
129    def in_work(self) -> bool:
130        result: bool = bool(self.direct_requests)
131        return self.thrifty_in_work(result)

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
in_forground_work
thrifty_in_work
time_left_before_next_event
is_low_latency
make_live
make_dead
service_id_impl
service_id
destroy
def kill_coro_list_on( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], coro_list: typing.Sequence[typing.Union[int, KillSingleCoroParams]]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Sequence[typing.Tuple[bool, typing.Union[Exception, NoneType]]]]:
134def kill_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]:
135    return make_request_to_service_with_context(context, KillCoroList, coro_list)
def try_kill_coro_list_on( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], coro_list: typing.Sequence[typing.Union[int, KillSingleCoroParams]]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[typing.Sequence[typing.Tuple[bool, typing.Union[Exception, NoneType]]], NoneType]]:
138def try_kill_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]:
139    return try_make_request_to_service_with_context(context, KillCoroList, coro_list)
async def akill_coro_list_on( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], coro_list: typing.Sequence[typing.Union[int, KillSingleCoroParams]]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Sequence[typing.Tuple[bool, typing.Union[Exception, NoneType]]]]:
142async def akill_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]:
143    return await amake_request_to_service_with_context(context, KillCoroList, coro_list)
async def atry_kill_coro_list_on( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], coro_list: typing.Sequence[typing.Union[int, KillSingleCoroParams]]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[typing.Sequence[typing.Tuple[bool, typing.Union[Exception, NoneType]]], NoneType]]:
146async def atry_kill_coro_list_on(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]:
147    return await atry_make_request_to_service_with_context(context, KillCoroList, coro_list)
def kill_coro_list( coro_list: typing.Sequence[typing.Union[int, KillSingleCoroParams]]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Sequence[typing.Tuple[bool, typing.Union[Exception, NoneType]]]]:
150def kill_coro_list(coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]:
151    return make_request_to_service(KillCoroList, coro_list)
def try_kill_coro_list( coro_list: typing.Sequence[typing.Union[int, KillSingleCoroParams]]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[typing.Sequence[typing.Tuple[bool, typing.Union[Exception, NoneType]]], NoneType]]:
154def try_kill_coro_list(coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]:
155    return try_make_request_to_service(KillCoroList, coro_list)
async def akill_coro_list( coro_list: typing.Sequence[typing.Union[int, KillSingleCoroParams]]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Sequence[typing.Tuple[bool, typing.Union[Exception, NoneType]]]]:
158async def akill_coro_list(coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Sequence[Tuple[bool, Optional[Exception]]]]:
159    return await amake_request_to_service(KillCoroList, coro_list)
async def atry_kill_coro_list( coro_list: typing.Sequence[typing.Union[int, KillSingleCoroParams]]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[typing.Sequence[typing.Tuple[bool, typing.Union[Exception, NoneType]]], NoneType]]:
162async def atry_kill_coro_list(coro_list: Sequence[Union[CoroID, KillSingleCoroParams]]) -> ValueExistence[Optional[Sequence[Tuple[bool, Optional[Exception]]]]]:
163    return await atry_make_request_to_service(KillCoroList, coro_list)