cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19"""
 20Module Docstring
 21Docstrings: http://www.python.org/dev/peps/pep-0257/
 22"""
 23
 24
 25__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 28__license__ = "Apache License, Version 2.0"
 29__version__ = "4.4.1"
 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 31__email__ = "gtalk@butenkoms.space"
 32# __status__ = "Prototype"
 33__status__ = "Development"
 34# __status__ = "Production"
 35
 36
 37__all__ = [
 38    'TaskIsNotFinished', 'TaskIsFinished', 'Task', 
 39    'PutCoro', 'PutCoroRequest', 'put_current_from_other_service', 'put_from_other_service', 'put_root_from_other_service', 
 40    'put_coro_to', 'try_put_coro_to', 'aput_coro_to', 'atry_put_coro_to', 'put_coro', 'try_put_coro', 'aput_coro', 
 41    'atry_put_coro', 'travers_through_all_coro_children_on', 'try_travers_through_all_coro_children_on', 
 42    'travers_through_all_coro_children', 'try_travers_through_all_coro_children', 'get_set_of_all_coro_children_on', 
 43    'try_get_set_of_all_coro_children_on', 'get_set_of_all_coro_children', 'try_get_set_of_all_coro_children', 
 44    'travers_through_all_coro_parents_on', 'try_travers_through_all_coro_parents_on', 'travers_through_all_coro_parents', 
 45    'try_travers_through_all_coro_parents', 'get_set_of_all_coro_parents_on', 'try_get_set_of_all_coro_parents_on', 
 46    'get_set_of_all_coro_parents', 'try_get_set_of_all_coro_parents', 'get_set_of_all_coros_on', 
 47    'try_get_set_of_all_coros_on', 'get_set_of_all_coros', 'try_get_set_of_all_coros']
 48
 49from cengal.parallel_execution.coroutines.coro_scheduler import *
 50from cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request import *
 51from cengal.code_flow_control.smart_values import ValueExistence
 52from cengal.introspection.inspect import get_exception, get_exception_tripple
 53from cengal.data_manipulation.tree_traversal import KeyMultiValueTreeTraversal, KeyValueTreeTraversal
 54from typing import Hashable, Tuple, List, Optional, Any, Union, cast, Dict, Callable, Set
 55
 56
 57class TaskIsNotFinished(Exception):
 58    pass
 59
 60
 61class TaskIsFinished(Exception):
 62    pass
 63
 64
 65class Task:
 66    def __init__(self, coro_wrapper: CoroWrapperBase):
 67        self._coro_wrapper = coro_wrapper
 68        self._coro_id = coro_wrapper.coro_id
 69        self._coro_wrapper.add_on_coro_del_handler(self._on_coro_del_handler)
 70        self._done: bool = False
 71        self._result: Any = None
 72        self._exception: Exception = None
 73        self._on_done_handlers: Set[Callable[['Task'], None]] = set()
 74    
 75    def _on_coro_del_handler(self, coro_wrapper: CoroWrapperBase) -> bool:
 76        self._result = self._coro_wrapper.last_result
 77        self._exception = self._coro_wrapper.exception
 78        self._coro_wrapper = None
 79        self._done = True
 80        for handler in self._on_done_handlers:
 81            handler(self)
 82        
 83        return True
 84    
 85    @property
 86    def coro_id(self):
 87        return self._coro_id
 88    
 89    @property
 90    def coro_wrapper(self):
 91        if self._done:
 92            raise TaskIsFinished()
 93
 94        return self._coro_wrapper
 95    
 96    @property
 97    def result(self):
 98        if not self._done:
 99            raise TaskIsNotFinished()
100        
101        if self._exception is not None:
102            raise self._exception
103        
104        return self._result
105    
106    def done(self) -> bool:
107        return self._done
108    
109    def __bool__(self) -> bool:
110        return self._done
111
112    def __nonzero__(self):
113        return self.__bool__()
114    
115    def add_on_done_handler(self, handler: Callable[['Task'], None]) -> bool:
116        if self._done:
117            handler(self)
118        else:
119            self._on_done_handlers.add(handler)
120        
121        return True
122
123
124class PutCoroRequest(ServiceRequest):
125    def turn_on_tree_monitoring(self, turn_on: bool) -> bool:
126        return self._save(0, turn_on)
127
128    def tree_monitoring_state(self) -> bool:
129        return self._save(1)
130
131    # TODO: An implementation required:
132    def set_on_children_start_handler(self, parent_coro_id: CoroID, handler: Callable) -> ServiceRequest:
133        return self._save(2, parent_coro_id, handler)
134
135    # TODO: An implementation required:
136    def set_on_children_del_handler(self, parent_coro_id: CoroID, handler: Callable) -> ServiceRequest:
137        return self._save(3, parent_coro_id, handler)
138
139    def put_background_coro(self, coro_worker: AnyWorker, *args, **kwargs) -> ServiceRequest:
140        return self._save(4, coro_worker, args, kwargs)
141
142    def task(self, coro_worker: AnyWorker, *args, **kwargs) -> ServiceRequest:
143        return self._save(5, coro_worker, args, kwargs, False)
144
145    def background_task(self, coro_worker: AnyWorker, *args, **kwargs) -> ServiceRequest:
146        return self._save(5, coro_worker, args, kwargs, True)
147
148
149class PutCoro(ServiceWithADirectRequestMixin, DualImmediateProcessingServiceMixin, TypedService[CoroID]):
150    def __init__(self, loop: CoroSchedulerType):
151        super(PutCoro, self).__init__(loop)
152        self._tree_monitoring_turned_on: bool = None
153        self._single_task_registration_or_immediate_processing_single_impl: Callable = None
154        self._single_task_registration_or_immediate_processing_single_impl_impl: Callable = None
155        self._turn_on_tree_monitoring(False)
156        self._tree_children_by_parent: Dict[CoroID, Set[CoroID]] = dict()
157        self._tree_parent_by_child: Dict[CoroID, CoroID] = dict()
158        self._dead_coroutines: Set[CoroID] = set()
159
160        self._request_workers = {
161            0: self._turn_on_tree_monitoring,
162            1: self._tree_monitoring_state,
163            4: self._put_background_coro,
164            5: self._task,
165        }
166
167        self.direct_requests: List[Tuple] = list()
168    
169    def single_task_registration_or_immediate_processing_single(
170            self, coro_worker: AnyWorker, *args, **kwargs
171    ) -> Tuple[bool, Optional[CoroID], Any]:
172        try:
173            coro = self._single_task_registration_or_immediate_processing_single_impl(coro_worker, *args, **kwargs)
174        except:
175            return True, None, get_exception()
176
177        return True, coro.coro_id, None
178    
179    def _single_task_registration_or_immediate_processing_single_impl_with_tree(
180            self, coro_worker: AnyWorker, *args, **kwargs
181    ) -> CoroWrapperBase:
182        return self._single_task_registration_or_immediate_processing_single_impl_with_tree_impl(self.current_caller_coro_info.coro_id, coro_worker, *args, **kwargs)
183    
184    def _single_task_registration_or_immediate_processing_single_impl_with_tree_impl(
185            self, parent_coro_id: CoroID, coro_worker: AnyWorker, *args, **kwargs
186    ) -> CoroWrapperBase:
187        coro: CoroWrapperBase = self._loop.put_coro(coro_worker, *args, **kwargs)
188        child_coro_id: CoroID = coro.coro_id
189        if parent_coro_id not in self._tree_children_by_parent:
190            self._tree_children_by_parent[parent_coro_id] = set()
191        
192        self._tree_children_by_parent[parent_coro_id].add(child_coro_id)
193        self._tree_parent_by_child[child_coro_id] = parent_coro_id
194        coro.add_on_coro_del_handler(self._on_coro_del_handler)
195        return coro
196    
197    def _single_task_registration_or_immediate_processing_single_impl_without_tree(
198            self, coro_worker: AnyWorker, *args, **kwargs
199    ) -> CoroWrapperBase:
200        return self._single_task_registration_or_immediate_processing_single_impl_without_tree_impl(self.current_caller_coro_info.coro_id, coro_worker, *args, **kwargs)
201    
202    def _single_task_registration_or_immediate_processing_single_impl_without_tree_impl(
203            self, parent_coro_id: CoroID, coro_worker: AnyWorker, *args, **kwargs
204    ) -> CoroWrapperBase:
205        return self._loop.put_coro(coro_worker, *args, **kwargs)
206    
207    def _turn_on_tree_monitoring(self, turn_on: bool):
208        previous_state: bool = self._tree_monitoring_turned_on
209        self._tree_monitoring_turned_on = turn_on
210        if turn_on:
211            self._single_task_registration_or_immediate_processing_single_impl = self._single_task_registration_or_immediate_processing_single_impl_with_tree
212            self._single_task_registration_or_immediate_processing_single_impl_impl = self._single_task_registration_or_immediate_processing_single_impl_with_tree_impl
213        else:
214            self._single_task_registration_or_immediate_processing_single_impl = self._single_task_registration_or_immediate_processing_single_impl_without_tree
215            self._single_task_registration_or_immediate_processing_single_impl_impl = self._single_task_registration_or_immediate_processing_single_impl_without_tree_impl
216        
217        return True, previous_state, None
218    
219    def put_from_other_service(
220            self, coro_id: CoroID, coro_worker: AnyWorker, *args, **kwargs
221        ) -> CoroWrapperBase:
222        return self._single_task_registration_or_immediate_processing_single_impl_impl(coro_id, coro_worker, *args, **kwargs)
223    
224    def put_task_from_other_service(
225            self, coro_id: CoroID, coro_worker: AnyWorker, args, kwargs
226        ) -> Task:
227        coro: CoroWrapperBase = self._single_task_registration_or_immediate_processing_single_impl_impl(coro_id, coro_worker, *args, **kwargs)
228        return Task(coro)
229    
230    def put_root_from_other_service(
231            self, coro_worker: AnyWorker, *args, **kwargs
232        ) -> CoroWrapperBase:
233        return self._loop.put_coro(coro_worker, *args, **kwargs)
234    
235    def _tree_monitoring_state(self):
236        return True, self._tree_monitoring_turned_on, None
237    
238    def _put_background_coro(self, coro_worker: AnyWorker, args, kwargs):
239        try:
240            coro: CoroWrapperBase = self._single_task_registration_or_immediate_processing_single_impl(coro_worker, *args, **kwargs)
241            coro.is_background_coro = True
242        except:
243            return True, None, get_exception()
244
245        return True, coro.coro_id, None
246    
247    def _task(self, coro_worker: AnyWorker, args: Tuple, kwargs: Dict, background: bool):
248        try:
249            coro: CoroWrapperBase = self._single_task_registration_or_immediate_processing_single_impl(coro_worker, *args, **kwargs)
250            coro.is_background_coro = background
251        except:
252            return True, None, get_exception()
253
254        return True, Task(coro), None
255
256    def full_processing_iteration(self):
257        if self._dead_coroutines:
258            dead_coroutines_buff = self._dead_coroutines
259            self._dead_coroutines = type(dead_coroutines_buff)()
260            for dead_coro_id in dead_coroutines_buff:
261                parent_coro_id = self._tree_parent_by_child.get(dead_coro_id, None)
262                children = self._tree_children_by_parent.get(dead_coro_id, None)
263                if parent_coro_id is None:
264                    if children is not None:
265                        for child_coro_id in children:
266                            self._tree_parent_by_child.pop(child_coro_id, None)
267                else:
268                    sibblings = self._tree_children_by_parent[parent_coro_id]
269                    sibblings.discard(dead_coro_id)
270                    if children is not None:
271                        sibblings.update(children)
272                        for child_coro_id in children:
273                            self._tree_parent_by_child[child_coro_id] = parent_coro_id
274
275        if self.direct_requests:
276            direct_requests_buff = self.direct_requests
277            self.direct_requests = type(direct_requests_buff)()
278            for coro_worker, args, kwargs in direct_requests_buff:
279                try:
280                    coro = self._loop.put_coro(coro_worker, *args, **kwargs)
281                except:
282                    ex_type, exception, tracback = get_exception_tripple()
283                    if __debug__: dlog(ex_type, exception, tracback)
284                    raise
285
286        self.make_dead()
287    
288    def _add_direct_request(self, coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[None]:
289        self.direct_requests.append((coro_worker, args, kwargs))
290        self.make_live()
291        return (False, None)
292
293    def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool:
294        self._dead_coroutines.add(coro.coro_id)
295        self.make_live()
296        return False
297    
298    def travers_through_all_children(self, coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None):
299        t = KeyMultiValueTreeTraversal(self._tree_children_by_parent, None, handler, on_switched_to_stack_based_implementation)
300        t(coro_id)
301    
302    def get_set_of_all_children(self, coro_id) -> Set[CoroID]:
303        result = set()
304        def handler(deep, parent, child: ValueExistence[CoroID], index):
305            if child:
306                result.add(child[1])
307        
308        self.travers_through_all_children(coro_id, handler)
309        result.discard(coro_id)
310        return result
311    
312    def travers_through_all_parents(self, coro_id, handler: Callable[[CoroID, CoroID], None], on_switched_to_stack_based_implementation: Optional[Callable]=None):
313        t = KeyValueTreeTraversal(self._tree_parent_by_child, None, handler, on_switched_to_stack_based_implementation)
314        t(coro_id)
315    
316    def get_set_of_all_parents(self, coro_id):
317        result = set()
318        def handler(deep, child, parent, index):
319            if parent is not None:
320                result.add(parent)
321        
322        self.travers_through_all_parents(coro_id, handler)
323        result.discard(coro_id)
324        return result
325    
326    def get_set_of_all_coros(self):
327        result = set(self._tree_children_by_parent.keys())
328        result.update(self._tree_parent_by_child.keys())
329        return result
330
331    def in_work(self) -> bool:
332        result: bool = bool(self.direct_requests) or bool(self._dead_coroutines)
333        return self.thrifty_in_work(result)
334
335
336PutCoroRequest.default_service_type = PutCoro
337
338
339def put_current_from_other_service(current_service: Service, coro_worker: AnyWorker, *args, **kwargs) -> CoroWrapperBase:
340    put_coro: PutCoro = current_service._loop.get_service_instance(PutCoro)
341    return put_coro.put_from_other_service(current_service.current_caller_coro_info.coro_id, coro_worker, *args, **kwargs)
342
343
344def put_from_other_service(current_service: Service, coro_id: CoroID, coro_worker: AnyWorker, *args, **kwargs) -> CoroWrapperBase:
345    put_coro: PutCoro = current_service._loop.get_service_instance(PutCoro)
346    return put_coro.put_from_other_service(coro_id, coro_worker, *args, **kwargs)
347
348
349def put_root_from_other_service(current_service: Service, coro_worker: AnyWorker, *args, **kwargs) -> CoroWrapperBase:
350    put_coro: PutCoro = current_service._loop.get_service_instance(PutCoro)
351    return put_coro.put_root_from_other_service(coro_worker, *args, **kwargs)
352
353
354def put_coro_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[Optional[CoroID]]:
355    """_summary_
356        context can be generated by one of the [interface_and_loop_with_backup_loop, get_interface_and_loop_with_backup_loop, interface_and_loop_with_explicit_loop, get_interface_and_loop_with_explicit_loop, interface_for_an_explicit_loop, get_interface_for_an_explicit_loop] functions from the cengal/parallel_execution/coroutines/coro_scheduler module
357        
358        An example:
359
360        from cengal.parallel_execution.coroutines.coro_scheduler import get_interface_and_loop_with_explicit_loop, CoroSchedulerType, ExplicitWorker, Worker, CoroID
361        from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import put_coro_to
362        from typing import Optional, Union
363
364        def my_func(loop: CoroSchedulerType, coro_worker: AnyWorker, a, b) -> Optional[CoroID]:
365            resulting_coro_id: Optional[CoroID] = None
366            try:
367                result = put_coro_to(get_interface_and_loop_with_explicit_loop(loop), coro_worker, a, b)
368                if result:
369                    print(f'We are inside of the loop AND in the coroutine. Our requested coro already was created at this moment. Coro id: {result.value}')
370                    resulting_coro_id = result.value
371                else:
372                    print('We are outside of the loop or not in the coroutine body. Our requested coroutine will be created in the near future.')
373            except CoroSchedulerContextIsNotAvailable:
374                print('We are outside of the loop AND no loop was selected as a Primary AND our given `loop` var is None)
375            
376            return resulting_coro_id
377        
378    Args:
379        context (Tuple[Optional[CoroSchedulerType], Optional[Interface], bool]): _description_
380        coro_worker (AnyWorker): _description_
381
382    Returns:
383        ValueExistence[Optional[CoroID]]: _description_
384    """
385    return make_request_to_service_with_context(context, PutCoro, coro_worker, *args, **kwargs)
386
387
388def try_put_coro_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[Optional[CoroID]]:
389    """_summary_
390        context can be generated by one of the [interface_and_loop_with_backup_loop, get_interface_and_loop_with_backup_loop, interface_and_loop_with_explicit_loop, get_interface_and_loop_with_explicit_loop, interface_for_an_explicit_loop, get_interface_for_an_explicit_loop] functions from the cengal/parallel_execution/coroutines/coro_scheduler module
391        
392        An example:
393
394        from cengal.parallel_execution.coroutines.coro_scheduler import get_interface_and_loop_with_explicit_loop, CoroSchedulerType, ExplicitWorker, Worker, CoroID
395        from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import try_put_coro_to
396        from typing import Optional, Union
397
398        def my_func(loop: CoroSchedulerType, coro_worker: AnyWorker, a, b) -> Optional[CoroID]:
399            resulting_coro_id: Optional[CoroID] = None
400            result = put_coro_to(get_interface_and_loop_with_explicit_loop(loop), coro_worker, a, b)
401            if result:
402                print(f'We are inside of the loop AND in the coroutine. Our requested coro already was created at this moment. Coro id: {result.value}')
403                resulting_coro_id = result.value
404            else:
405                print('There are two possibilities:)
406                print('1) We are outside of the loop or not in the coroutine body. Our requested coroutine will be created in the near future.')
407                print('2) We are outside of the loop AND no loop was selected as a Primary AND our given `loop` var is None. Our requested coroutine will NOT be created at all.)
408            
409            return resulting_coro_id
410        
411    Args:
412        context (Tuple[Optional[CoroSchedulerType], Optional[Interface], bool]): _description_
413        coro_worker (AnyWorker): _description_
414
415    Returns:
416        ValueExistence[Optional[CoroID]]: _description_
417    """
418    return try_make_request_to_service_with_context(context, PutCoro, coro_worker, *args, **kwargs)
419
420
421async def aput_coro_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[CoroID]:
422    return await amake_request_to_service_with_context(context, PutCoro, coro_worker, *args, **kwargs)
423
424
425async def atry_put_coro_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[Optional[CoroID]]:
426    return await atry_make_request_to_service_with_context(context, PutCoro, coro_worker, *args, **kwargs)
427
428
429def put_coro(coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[CoroID]:
430    return make_request_to_service(PutCoro, coro_worker, *args, **kwargs)
431
432
433def try_put_coro(coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[Optional[CoroID]]:
434    return try_make_request_to_service(PutCoro, coro_worker, *args, **kwargs)
435
436
437async def aput_coro(coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[CoroID]:
438    return await amake_request_to_service(PutCoro, coro_worker, *args, **kwargs)
439
440
441async def atry_put_coro(coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[Optional[CoroID]]:
442    return await atry_make_request_to_service(PutCoro, coro_worker, *args, **kwargs)
443
444
445# ==================================================
446
447
448def travers_through_all_coro_children_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[CoroID]:
449    put_coro: PutCoro = service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
450    return put_coro.travers_through_all_children(coro_id, handler, on_switched_to_stack_based_implementation)
451
452
453def try_travers_through_all_coro_children_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[Optional[CoroID]]:
454    put_coro: PutCoro = get_service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
455    if put_coro is not None:
456        return put_coro.travers_through_all_children(coro_id, handler, on_switched_to_stack_based_implementation)
457
458
459def travers_through_all_coro_children(coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[CoroID]:
460    return travers_through_all_coro_children_on(get_available_coro_scheduler(), coro_id, handler, on_switched_to_stack_based_implementation)
461
462
463def try_travers_through_all_coro_children(coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[Optional[CoroID]]:
464    return try_travers_through_all_coro_children_on(get_available_coro_scheduler(), coro_id, handler, on_switched_to_stack_based_implementation)
465
466
467# ==================================================
468
469
470def get_set_of_all_coro_children_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id) -> Set[CoroID]:
471    put_coro: PutCoro = service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
472    return put_coro.get_set_of_all_children(coro_id)
473
474
475def try_get_set_of_all_coro_children_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id) -> Optional[Set[CoroID]]:
476    put_coro: PutCoro = get_service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
477    if put_coro is not None:
478        return put_coro.get_set_of_all_children(coro_id)
479
480
481def get_set_of_all_coro_children(coro_id) -> Set[CoroID]:
482    return get_set_of_all_coro_children_on(get_available_coro_scheduler(), coro_id)
483
484
485def try_get_set_of_all_coro_children(coro_id) -> Optional[Set[CoroID]]:
486    return try_get_set_of_all_coro_children_on(get_available_coro_scheduler(), coro_id)
487
488
489# ==================================================
490
491
492def travers_through_all_coro_parents_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[CoroID]:
493    put_coro: PutCoro = service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
494    return put_coro.travers_through_all_parents(coro_id, handler, on_switched_to_stack_based_implementation)
495
496
497def try_travers_through_all_coro_parents_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[Optional[CoroID]]:
498    put_coro: PutCoro = get_service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
499    if put_coro is not None:
500        return put_coro.travers_through_all_parents(coro_id, handler, on_switched_to_stack_based_implementation)
501
502
503def travers_through_all_coro_parents(coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[CoroID]:
504    return travers_through_all_coro_parents_on(get_available_coro_scheduler(), coro_id, handler, on_switched_to_stack_based_implementation)
505
506
507def try_travers_through_all_coro_parents(coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[Optional[CoroID]]:
508    return try_travers_through_all_coro_parents_on(get_available_coro_scheduler(), coro_id, handler, on_switched_to_stack_based_implementation)
509
510
511# ==================================================
512
513
514def get_set_of_all_coro_parents_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id) -> ValueExistence[CoroID]:
515    put_coro: PutCoro = service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
516    return put_coro.get_set_of_all_parents(coro_id)
517
518
519def try_get_set_of_all_coro_parents_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id) -> ValueExistence[Optional[CoroID]]:
520    put_coro: PutCoro = get_service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
521    if put_coro is not None:
522        return put_coro.get_set_of_all_parents(coro_id)
523
524
525def get_set_of_all_coro_parents(coro_id) -> ValueExistence[CoroID]:
526    return get_set_of_all_coro_parents_on(get_available_coro_scheduler(), coro_id)
527
528
529def try_get_set_of_all_coro_parents(coro_id) -> ValueExistence[Optional[CoroID]]:
530    return try_get_set_of_all_coro_parents_on(get_available_coro_scheduler(), coro_id)
531
532
533# ==================================================
534
535
536def get_set_of_all_coros_on(prioritized_coro_scheduler: Optional[CoroSchedulerType]) -> ValueExistence[CoroID]:
537    put_coro: PutCoro = service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
538    return put_coro.get_set_of_all_coros()
539
540
541def try_get_set_of_all_coros_on(prioritized_coro_scheduler: Optional[CoroSchedulerType]) -> ValueExistence[Optional[CoroID]]:
542    put_coro: PutCoro = get_service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
543    if put_coro is not None:
544        return put_coro.get_set_of_all_coros()
545
546
547def get_set_of_all_coros() -> ValueExistence[CoroID]:
548    return get_set_of_all_coros_on(get_available_coro_scheduler())
549
550
551def try_get_set_of_all_coros() -> ValueExistence[Optional[CoroID]]:
552    return try_get_set_of_all_coros_on(get_available_coro_scheduler())
class TaskIsNotFinished(builtins.Exception):
58class TaskIsNotFinished(Exception):
59    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class TaskIsFinished(builtins.Exception):
62class TaskIsFinished(Exception):
63    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class Task:
 66class Task:
 67    def __init__(self, coro_wrapper: CoroWrapperBase):
 68        self._coro_wrapper = coro_wrapper
 69        self._coro_id = coro_wrapper.coro_id
 70        self._coro_wrapper.add_on_coro_del_handler(self._on_coro_del_handler)
 71        self._done: bool = False
 72        self._result: Any = None
 73        self._exception: Exception = None
 74        self._on_done_handlers: Set[Callable[['Task'], None]] = set()
 75    
 76    def _on_coro_del_handler(self, coro_wrapper: CoroWrapperBase) -> bool:
 77        self._result = self._coro_wrapper.last_result
 78        self._exception = self._coro_wrapper.exception
 79        self._coro_wrapper = None
 80        self._done = True
 81        for handler in self._on_done_handlers:
 82            handler(self)
 83        
 84        return True
 85    
 86    @property
 87    def coro_id(self):
 88        return self._coro_id
 89    
 90    @property
 91    def coro_wrapper(self):
 92        if self._done:
 93            raise TaskIsFinished()
 94
 95        return self._coro_wrapper
 96    
 97    @property
 98    def result(self):
 99        if not self._done:
100            raise TaskIsNotFinished()
101        
102        if self._exception is not None:
103            raise self._exception
104        
105        return self._result
106    
107    def done(self) -> bool:
108        return self._done
109    
110    def __bool__(self) -> bool:
111        return self._done
112
113    def __nonzero__(self):
114        return self.__bool__()
115    
116    def add_on_done_handler(self, handler: Callable[['Task'], None]) -> bool:
117        if self._done:
118            handler(self)
119        else:
120            self._on_done_handlers.add(handler)
121        
122        return True
Task( coro_wrapper: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroWrapperBase)
67    def __init__(self, coro_wrapper: CoroWrapperBase):
68        self._coro_wrapper = coro_wrapper
69        self._coro_id = coro_wrapper.coro_id
70        self._coro_wrapper.add_on_coro_del_handler(self._on_coro_del_handler)
71        self._done: bool = False
72        self._result: Any = None
73        self._exception: Exception = None
74        self._on_done_handlers: Set[Callable[['Task'], None]] = set()
coro_id
86    @property
87    def coro_id(self):
88        return self._coro_id
coro_wrapper
90    @property
91    def coro_wrapper(self):
92        if self._done:
93            raise TaskIsFinished()
94
95        return self._coro_wrapper
result
 97    @property
 98    def result(self):
 99        if not self._done:
100            raise TaskIsNotFinished()
101        
102        if self._exception is not None:
103            raise self._exception
104        
105        return self._result
def done(self) -> bool:
107    def done(self) -> bool:
108        return self._done
def add_on_done_handler( self, handler: collections.abc.Callable[Task, NoneType]) -> bool:
116    def add_on_done_handler(self, handler: Callable[['Task'], None]) -> bool:
117        if self._done:
118            handler(self)
119        else:
120            self._on_done_handlers.add(handler)
121        
122        return True
class PutCoro(cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request.versions.v_0.service_with_a_direct_request.ServiceWithADirectRequestMixin, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.DualImmediateProcessingServiceMixin, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.TypedService[int]):
150class PutCoro(ServiceWithADirectRequestMixin, DualImmediateProcessingServiceMixin, TypedService[CoroID]):
151    def __init__(self, loop: CoroSchedulerType):
152        super(PutCoro, self).__init__(loop)
153        self._tree_monitoring_turned_on: bool = None
154        self._single_task_registration_or_immediate_processing_single_impl: Callable = None
155        self._single_task_registration_or_immediate_processing_single_impl_impl: Callable = None
156        self._turn_on_tree_monitoring(False)
157        self._tree_children_by_parent: Dict[CoroID, Set[CoroID]] = dict()
158        self._tree_parent_by_child: Dict[CoroID, CoroID] = dict()
159        self._dead_coroutines: Set[CoroID] = set()
160
161        self._request_workers = {
162            0: self._turn_on_tree_monitoring,
163            1: self._tree_monitoring_state,
164            4: self._put_background_coro,
165            5: self._task,
166        }
167
168        self.direct_requests: List[Tuple] = list()
169    
170    def single_task_registration_or_immediate_processing_single(
171            self, coro_worker: AnyWorker, *args, **kwargs
172    ) -> Tuple[bool, Optional[CoroID], Any]:
173        try:
174            coro = self._single_task_registration_or_immediate_processing_single_impl(coro_worker, *args, **kwargs)
175        except:
176            return True, None, get_exception()
177
178        return True, coro.coro_id, None
179    
180    def _single_task_registration_or_immediate_processing_single_impl_with_tree(
181            self, coro_worker: AnyWorker, *args, **kwargs
182    ) -> CoroWrapperBase:
183        return self._single_task_registration_or_immediate_processing_single_impl_with_tree_impl(self.current_caller_coro_info.coro_id, coro_worker, *args, **kwargs)
184    
185    def _single_task_registration_or_immediate_processing_single_impl_with_tree_impl(
186            self, parent_coro_id: CoroID, coro_worker: AnyWorker, *args, **kwargs
187    ) -> CoroWrapperBase:
188        coro: CoroWrapperBase = self._loop.put_coro(coro_worker, *args, **kwargs)
189        child_coro_id: CoroID = coro.coro_id
190        if parent_coro_id not in self._tree_children_by_parent:
191            self._tree_children_by_parent[parent_coro_id] = set()
192        
193        self._tree_children_by_parent[parent_coro_id].add(child_coro_id)
194        self._tree_parent_by_child[child_coro_id] = parent_coro_id
195        coro.add_on_coro_del_handler(self._on_coro_del_handler)
196        return coro
197    
198    def _single_task_registration_or_immediate_processing_single_impl_without_tree(
199            self, coro_worker: AnyWorker, *args, **kwargs
200    ) -> CoroWrapperBase:
201        return self._single_task_registration_or_immediate_processing_single_impl_without_tree_impl(self.current_caller_coro_info.coro_id, coro_worker, *args, **kwargs)
202    
203    def _single_task_registration_or_immediate_processing_single_impl_without_tree_impl(
204            self, parent_coro_id: CoroID, coro_worker: AnyWorker, *args, **kwargs
205    ) -> CoroWrapperBase:
206        return self._loop.put_coro(coro_worker, *args, **kwargs)
207    
208    def _turn_on_tree_monitoring(self, turn_on: bool):
209        previous_state: bool = self._tree_monitoring_turned_on
210        self._tree_monitoring_turned_on = turn_on
211        if turn_on:
212            self._single_task_registration_or_immediate_processing_single_impl = self._single_task_registration_or_immediate_processing_single_impl_with_tree
213            self._single_task_registration_or_immediate_processing_single_impl_impl = self._single_task_registration_or_immediate_processing_single_impl_with_tree_impl
214        else:
215            self._single_task_registration_or_immediate_processing_single_impl = self._single_task_registration_or_immediate_processing_single_impl_without_tree
216            self._single_task_registration_or_immediate_processing_single_impl_impl = self._single_task_registration_or_immediate_processing_single_impl_without_tree_impl
217        
218        return True, previous_state, None
219    
220    def put_from_other_service(
221            self, coro_id: CoroID, coro_worker: AnyWorker, *args, **kwargs
222        ) -> CoroWrapperBase:
223        return self._single_task_registration_or_immediate_processing_single_impl_impl(coro_id, coro_worker, *args, **kwargs)
224    
225    def put_task_from_other_service(
226            self, coro_id: CoroID, coro_worker: AnyWorker, args, kwargs
227        ) -> Task:
228        coro: CoroWrapperBase = self._single_task_registration_or_immediate_processing_single_impl_impl(coro_id, coro_worker, *args, **kwargs)
229        return Task(coro)
230    
231    def put_root_from_other_service(
232            self, coro_worker: AnyWorker, *args, **kwargs
233        ) -> CoroWrapperBase:
234        return self._loop.put_coro(coro_worker, *args, **kwargs)
235    
236    def _tree_monitoring_state(self):
237        return True, self._tree_monitoring_turned_on, None
238    
239    def _put_background_coro(self, coro_worker: AnyWorker, args, kwargs):
240        try:
241            coro: CoroWrapperBase = self._single_task_registration_or_immediate_processing_single_impl(coro_worker, *args, **kwargs)
242            coro.is_background_coro = True
243        except:
244            return True, None, get_exception()
245
246        return True, coro.coro_id, None
247    
248    def _task(self, coro_worker: AnyWorker, args: Tuple, kwargs: Dict, background: bool):
249        try:
250            coro: CoroWrapperBase = self._single_task_registration_or_immediate_processing_single_impl(coro_worker, *args, **kwargs)
251            coro.is_background_coro = background
252        except:
253            return True, None, get_exception()
254
255        return True, Task(coro), None
256
257    def full_processing_iteration(self):
258        if self._dead_coroutines:
259            dead_coroutines_buff = self._dead_coroutines
260            self._dead_coroutines = type(dead_coroutines_buff)()
261            for dead_coro_id in dead_coroutines_buff:
262                parent_coro_id = self._tree_parent_by_child.get(dead_coro_id, None)
263                children = self._tree_children_by_parent.get(dead_coro_id, None)
264                if parent_coro_id is None:
265                    if children is not None:
266                        for child_coro_id in children:
267                            self._tree_parent_by_child.pop(child_coro_id, None)
268                else:
269                    sibblings = self._tree_children_by_parent[parent_coro_id]
270                    sibblings.discard(dead_coro_id)
271                    if children is not None:
272                        sibblings.update(children)
273                        for child_coro_id in children:
274                            self._tree_parent_by_child[child_coro_id] = parent_coro_id
275
276        if self.direct_requests:
277            direct_requests_buff = self.direct_requests
278            self.direct_requests = type(direct_requests_buff)()
279            for coro_worker, args, kwargs in direct_requests_buff:
280                try:
281                    coro = self._loop.put_coro(coro_worker, *args, **kwargs)
282                except:
283                    ex_type, exception, tracback = get_exception_tripple()
284                    if __debug__: dlog(ex_type, exception, tracback)
285                    raise
286
287        self.make_dead()
288    
289    def _add_direct_request(self, coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[None]:
290        self.direct_requests.append((coro_worker, args, kwargs))
291        self.make_live()
292        return (False, None)
293
294    def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool:
295        self._dead_coroutines.add(coro.coro_id)
296        self.make_live()
297        return False
298    
299    def travers_through_all_children(self, coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None):
300        t = KeyMultiValueTreeTraversal(self._tree_children_by_parent, None, handler, on_switched_to_stack_based_implementation)
301        t(coro_id)
302    
303    def get_set_of_all_children(self, coro_id) -> Set[CoroID]:
304        result = set()
305        def handler(deep, parent, child: ValueExistence[CoroID], index):
306            if child:
307                result.add(child[1])
308        
309        self.travers_through_all_children(coro_id, handler)
310        result.discard(coro_id)
311        return result
312    
313    def travers_through_all_parents(self, coro_id, handler: Callable[[CoroID, CoroID], None], on_switched_to_stack_based_implementation: Optional[Callable]=None):
314        t = KeyValueTreeTraversal(self._tree_parent_by_child, None, handler, on_switched_to_stack_based_implementation)
315        t(coro_id)
316    
317    def get_set_of_all_parents(self, coro_id):
318        result = set()
319        def handler(deep, child, parent, index):
320            if parent is not None:
321                result.add(parent)
322        
323        self.travers_through_all_parents(coro_id, handler)
324        result.discard(coro_id)
325        return result
326    
327    def get_set_of_all_coros(self):
328        result = set(self._tree_children_by_parent.keys())
329        result.update(self._tree_parent_by_child.keys())
330        return result
331
332    def in_work(self) -> bool:
333        result: bool = bool(self.direct_requests) or bool(self._dead_coroutines)
334        return self.thrifty_in_work(result)

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

PutCoro( loop: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
151    def __init__(self, loop: CoroSchedulerType):
152        super(PutCoro, self).__init__(loop)
153        self._tree_monitoring_turned_on: bool = None
154        self._single_task_registration_or_immediate_processing_single_impl: Callable = None
155        self._single_task_registration_or_immediate_processing_single_impl_impl: Callable = None
156        self._turn_on_tree_monitoring(False)
157        self._tree_children_by_parent: Dict[CoroID, Set[CoroID]] = dict()
158        self._tree_parent_by_child: Dict[CoroID, CoroID] = dict()
159        self._dead_coroutines: Set[CoroID] = set()
160
161        self._request_workers = {
162            0: self._turn_on_tree_monitoring,
163            1: self._tree_monitoring_state,
164            4: self._put_background_coro,
165            5: self._task,
166        }
167
168        self.direct_requests: List[Tuple] = list()
direct_requests: List[Tuple]
def put_from_other_service( self, coro_id: int, coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroWrapperBase:
220    def put_from_other_service(
221            self, coro_id: CoroID, coro_worker: AnyWorker, *args, **kwargs
222        ) -> CoroWrapperBase:
223        return self._single_task_registration_or_immediate_processing_single_impl_impl(coro_id, coro_worker, *args, **kwargs)
def put_task_from_other_service( self, coro_id: int, coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], args, kwargs) -> Task:
225    def put_task_from_other_service(
226            self, coro_id: CoroID, coro_worker: AnyWorker, args, kwargs
227        ) -> Task:
228        coro: CoroWrapperBase = self._single_task_registration_or_immediate_processing_single_impl_impl(coro_id, coro_worker, *args, **kwargs)
229        return Task(coro)
def put_root_from_other_service( self, coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroWrapperBase:
231    def put_root_from_other_service(
232            self, coro_worker: AnyWorker, *args, **kwargs
233        ) -> CoroWrapperBase:
234        return self._loop.put_coro(coro_worker, *args, **kwargs)
def full_processing_iteration(self):
257    def full_processing_iteration(self):
258        if self._dead_coroutines:
259            dead_coroutines_buff = self._dead_coroutines
260            self._dead_coroutines = type(dead_coroutines_buff)()
261            for dead_coro_id in dead_coroutines_buff:
262                parent_coro_id = self._tree_parent_by_child.get(dead_coro_id, None)
263                children = self._tree_children_by_parent.get(dead_coro_id, None)
264                if parent_coro_id is None:
265                    if children is not None:
266                        for child_coro_id in children:
267                            self._tree_parent_by_child.pop(child_coro_id, None)
268                else:
269                    sibblings = self._tree_children_by_parent[parent_coro_id]
270                    sibblings.discard(dead_coro_id)
271                    if children is not None:
272                        sibblings.update(children)
273                        for child_coro_id in children:
274                            self._tree_parent_by_child[child_coro_id] = parent_coro_id
275
276        if self.direct_requests:
277            direct_requests_buff = self.direct_requests
278            self.direct_requests = type(direct_requests_buff)()
279            for coro_worker, args, kwargs in direct_requests_buff:
280                try:
281                    coro = self._loop.put_coro(coro_worker, *args, **kwargs)
282                except:
283                    ex_type, exception, tracback = get_exception_tripple()
284                    if __debug__: dlog(ex_type, exception, tracback)
285                    raise
286
287        self.make_dead()
def travers_through_all_children( self, coro_id, handler: typing.Callable[[int, int, int, int], NoneType], on_switched_to_stack_based_implementation: typing.Union[typing.Callable, NoneType] = None):
299    def travers_through_all_children(self, coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None):
300        t = KeyMultiValueTreeTraversal(self._tree_children_by_parent, None, handler, on_switched_to_stack_based_implementation)
301        t(coro_id)
def get_set_of_all_children(self, coro_id) -> Set[int]:
303    def get_set_of_all_children(self, coro_id) -> Set[CoroID]:
304        result = set()
305        def handler(deep, parent, child: ValueExistence[CoroID], index):
306            if child:
307                result.add(child[1])
308        
309        self.travers_through_all_children(coro_id, handler)
310        result.discard(coro_id)
311        return result
def travers_through_all_parents( self, coro_id, handler: typing.Callable[[int, int], NoneType], on_switched_to_stack_based_implementation: typing.Union[typing.Callable, NoneType] = None):
313    def travers_through_all_parents(self, coro_id, handler: Callable[[CoroID, CoroID], None], on_switched_to_stack_based_implementation: Optional[Callable]=None):
314        t = KeyValueTreeTraversal(self._tree_parent_by_child, None, handler, on_switched_to_stack_based_implementation)
315        t(coro_id)
def get_set_of_all_parents(self, coro_id):
317    def get_set_of_all_parents(self, coro_id):
318        result = set()
319        def handler(deep, child, parent, index):
320            if parent is not None:
321                result.add(parent)
322        
323        self.travers_through_all_parents(coro_id, handler)
324        result.discard(coro_id)
325        return result
def get_set_of_all_coros(self):
327    def get_set_of_all_coros(self):
328        result = set(self._tree_children_by_parent.keys())
329        result.update(self._tree_parent_by_child.keys())
330        return result
def in_work(self) -> bool:
332    def in_work(self) -> bool:
333        result: bool = bool(self.direct_requests) or bool(self._dead_coroutines)
334        return self.thrifty_in_work(result)

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.DualImmediateProcessingServiceMixin
single_task_registration_or_immediate_processing
single_task_registration_or_immediate_processing_multiple
single_task_registration_or_immediate_processing_single
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
in_forground_work
thrifty_in_work
time_left_before_next_event
is_low_latency
make_live
make_dead
service_id_impl
service_id
destroy
class PutCoroRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
125class PutCoroRequest(ServiceRequest):
126    def turn_on_tree_monitoring(self, turn_on: bool) -> bool:
127        return self._save(0, turn_on)
128
129    def tree_monitoring_state(self) -> bool:
130        return self._save(1)
131
132    # TODO: An implementation required:
133    def set_on_children_start_handler(self, parent_coro_id: CoroID, handler: Callable) -> ServiceRequest:
134        return self._save(2, parent_coro_id, handler)
135
136    # TODO: An implementation required:
137    def set_on_children_del_handler(self, parent_coro_id: CoroID, handler: Callable) -> ServiceRequest:
138        return self._save(3, parent_coro_id, handler)
139
140    def put_background_coro(self, coro_worker: AnyWorker, *args, **kwargs) -> ServiceRequest:
141        return self._save(4, coro_worker, args, kwargs)
142
143    def task(self, coro_worker: AnyWorker, *args, **kwargs) -> ServiceRequest:
144        return self._save(5, coro_worker, args, kwargs, False)
145
146    def background_task(self, coro_worker: AnyWorker, *args, **kwargs) -> ServiceRequest:
147        return self._save(5, coro_worker, args, kwargs, True)
def turn_on_tree_monitoring(self, turn_on: bool) -> bool:
126    def turn_on_tree_monitoring(self, turn_on: bool) -> bool:
127        return self._save(0, turn_on)
def tree_monitoring_state(self) -> bool:
129    def tree_monitoring_state(self) -> bool:
130        return self._save(1)
def set_on_children_start_handler( self, parent_coro_id: int, handler: typing.Callable) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
133    def set_on_children_start_handler(self, parent_coro_id: CoroID, handler: Callable) -> ServiceRequest:
134        return self._save(2, parent_coro_id, handler)
def set_on_children_del_handler( self, parent_coro_id: int, handler: typing.Callable) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
137    def set_on_children_del_handler(self, parent_coro_id: CoroID, handler: Callable) -> ServiceRequest:
138        return self._save(3, parent_coro_id, handler)
def put_background_coro( self, coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
140    def put_background_coro(self, coro_worker: AnyWorker, *args, **kwargs) -> ServiceRequest:
141        return self._save(4, coro_worker, args, kwargs)
def task( self, coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
143    def task(self, coro_worker: AnyWorker, *args, **kwargs) -> ServiceRequest:
144        return self._save(5, coro_worker, args, kwargs, False)
def background_task( self, coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
146    def background_task(self, coro_worker: AnyWorker, *args, **kwargs) -> ServiceRequest:
147        return self._save(5, coro_worker, args, kwargs, True)
default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] = <class 'PutCoro'>
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
default__request__type__
request_type
args
kwargs
provide_to_request_handler
interface
i
async_interface
ai
def put_current_from_other_service( current_service: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service, coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroWrapperBase:
340def put_current_from_other_service(current_service: Service, coro_worker: AnyWorker, *args, **kwargs) -> CoroWrapperBase:
341    put_coro: PutCoro = current_service._loop.get_service_instance(PutCoro)
342    return put_coro.put_from_other_service(current_service.current_caller_coro_info.coro_id, coro_worker, *args, **kwargs)
def put_from_other_service( current_service: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service, coro_id: int, coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroWrapperBase:
345def put_from_other_service(current_service: Service, coro_id: CoroID, coro_worker: AnyWorker, *args, **kwargs) -> CoroWrapperBase:
346    put_coro: PutCoro = current_service._loop.get_service_instance(PutCoro)
347    return put_coro.put_from_other_service(coro_id, coro_worker, *args, **kwargs)
def put_root_from_other_service( current_service: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service, coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroWrapperBase:
350def put_root_from_other_service(current_service: Service, coro_worker: AnyWorker, *args, **kwargs) -> CoroWrapperBase:
351    put_coro: PutCoro = current_service._loop.get_service_instance(PutCoro)
352    return put_coro.put_root_from_other_service(coro_worker, *args, **kwargs)
def put_coro_to( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[int, NoneType]]:
355def put_coro_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[Optional[CoroID]]:
356    """_summary_
357        context can be generated by one of the [interface_and_loop_with_backup_loop, get_interface_and_loop_with_backup_loop, interface_and_loop_with_explicit_loop, get_interface_and_loop_with_explicit_loop, interface_for_an_explicit_loop, get_interface_for_an_explicit_loop] functions from the cengal/parallel_execution/coroutines/coro_scheduler module
358        
359        An example:
360
361        from cengal.parallel_execution.coroutines.coro_scheduler import get_interface_and_loop_with_explicit_loop, CoroSchedulerType, ExplicitWorker, Worker, CoroID
362        from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import put_coro_to
363        from typing import Optional, Union
364
365        def my_func(loop: CoroSchedulerType, coro_worker: AnyWorker, a, b) -> Optional[CoroID]:
366            resulting_coro_id: Optional[CoroID] = None
367            try:
368                result = put_coro_to(get_interface_and_loop_with_explicit_loop(loop), coro_worker, a, b)
369                if result:
370                    print(f'We are inside of the loop AND in the coroutine. Our requested coro already was created at this moment. Coro id: {result.value}')
371                    resulting_coro_id = result.value
372                else:
373                    print('We are outside of the loop or not in the coroutine body. Our requested coroutine will be created in the near future.')
374            except CoroSchedulerContextIsNotAvailable:
375                print('We are outside of the loop AND no loop was selected as a Primary AND our given `loop` var is None)
376            
377            return resulting_coro_id
378        
379    Args:
380        context (Tuple[Optional[CoroSchedulerType], Optional[Interface], bool]): _description_
381        coro_worker (AnyWorker): _description_
382
383    Returns:
384        ValueExistence[Optional[CoroID]]: _description_
385    """
386    return make_request_to_service_with_context(context, PutCoro, coro_worker, *args, **kwargs)

_summary_ context can be generated by one of the [interface_and_loop_with_backup_loop, get_interface_and_loop_with_backup_loop, interface_and_loop_with_explicit_loop, get_interface_and_loop_with_explicit_loop, interface_for_an_explicit_loop, get_interface_for_an_explicit_loop] functions from the cengal/parallel_execution/coroutines/coro_scheduler module

An example:

from cengal.parallel_execution.coroutines.coro_scheduler import get_interface_and_loop_with_explicit_loop, CoroSchedulerType, ExplicitWorker, Worker, CoroID
from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import put_coro_to
from typing import Optional, Union

def my_func(loop: CoroSchedulerType, coro_worker: AnyWorker, a, b) -> Optional[CoroID]:
    resulting_coro_id: Optional[CoroID] = None
    try:
        result = put_coro_to(get_interface_and_loop_with_explicit_loop(loop), coro_worker, a, b)
        if result:
            print(f'We are inside of the loop AND in the coroutine. Our requested coro already was created at this moment. Coro id: {result.value}')
            resulting_coro_id = result.value
        else:
            print('We are outside of the loop or not in the coroutine body. Our requested coroutine will be created in the near future.')
    except CoroSchedulerContextIsNotAvailable:
        print('We are outside of the loop AND no loop was selected as a Primary AND our given `loop` var is None)

    return resulting_coro_id

Args: context (Tuple[Optional[CoroSchedulerType], Optional[Interface], bool]): _description_ coro_worker (AnyWorker): _description_

Returns: ValueExistence[Optional[CoroID]]: _description_

def try_put_coro_to( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[int, NoneType]]:
389def try_put_coro_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[Optional[CoroID]]:
390    """_summary_
391        context can be generated by one of the [interface_and_loop_with_backup_loop, get_interface_and_loop_with_backup_loop, interface_and_loop_with_explicit_loop, get_interface_and_loop_with_explicit_loop, interface_for_an_explicit_loop, get_interface_for_an_explicit_loop] functions from the cengal/parallel_execution/coroutines/coro_scheduler module
392        
393        An example:
394
395        from cengal.parallel_execution.coroutines.coro_scheduler import get_interface_and_loop_with_explicit_loop, CoroSchedulerType, ExplicitWorker, Worker, CoroID
396        from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import try_put_coro_to
397        from typing import Optional, Union
398
399        def my_func(loop: CoroSchedulerType, coro_worker: AnyWorker, a, b) -> Optional[CoroID]:
400            resulting_coro_id: Optional[CoroID] = None
401            result = put_coro_to(get_interface_and_loop_with_explicit_loop(loop), coro_worker, a, b)
402            if result:
403                print(f'We are inside of the loop AND in the coroutine. Our requested coro already was created at this moment. Coro id: {result.value}')
404                resulting_coro_id = result.value
405            else:
406                print('There are two possibilities:)
407                print('1) We are outside of the loop or not in the coroutine body. Our requested coroutine will be created in the near future.')
408                print('2) We are outside of the loop AND no loop was selected as a Primary AND our given `loop` var is None. Our requested coroutine will NOT be created at all.)
409            
410            return resulting_coro_id
411        
412    Args:
413        context (Tuple[Optional[CoroSchedulerType], Optional[Interface], bool]): _description_
414        coro_worker (AnyWorker): _description_
415
416    Returns:
417        ValueExistence[Optional[CoroID]]: _description_
418    """
419    return try_make_request_to_service_with_context(context, PutCoro, coro_worker, *args, **kwargs)

_summary_ context can be generated by one of the [interface_and_loop_with_backup_loop, get_interface_and_loop_with_backup_loop, interface_and_loop_with_explicit_loop, get_interface_and_loop_with_explicit_loop, interface_for_an_explicit_loop, get_interface_for_an_explicit_loop] functions from the cengal/parallel_execution/coroutines/coro_scheduler module

An example:

from cengal.parallel_execution.coroutines.coro_scheduler import get_interface_and_loop_with_explicit_loop, CoroSchedulerType, ExplicitWorker, Worker, CoroID
from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import try_put_coro_to
from typing import Optional, Union

def my_func(loop: CoroSchedulerType, coro_worker: AnyWorker, a, b) -> Optional[CoroID]:
    resulting_coro_id: Optional[CoroID] = None
    result = put_coro_to(get_interface_and_loop_with_explicit_loop(loop), coro_worker, a, b)
    if result:
        print(f'We are inside of the loop AND in the coroutine. Our requested coro already was created at this moment. Coro id: {result.value}')
        resulting_coro_id = result.value
    else:
        print('There are two possibilities:)
        print('1) We are outside of the loop or not in the coroutine body. Our requested coroutine will be created in the near future.')
        print('2) We are outside of the loop AND no loop was selected as a Primary AND our given `loop` var is None. Our requested coroutine will NOT be created at all.)

    return resulting_coro_id

Args: context (Tuple[Optional[CoroSchedulerType], Optional[Interface], bool]): _description_ coro_worker (AnyWorker): _description_

Returns: ValueExistence[Optional[CoroID]]: _description_

async def aput_coro_to( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[int]:
422async def aput_coro_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[CoroID]:
423    return await amake_request_to_service_with_context(context, PutCoro, coro_worker, *args, **kwargs)
async def atry_put_coro_to( context: typing.Tuple[typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, NoneType], bool], coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[int, NoneType]]:
426async def atry_put_coro_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[Optional[CoroID]]:
427    return await atry_make_request_to_service_with_context(context, PutCoro, coro_worker, *args, **kwargs)
def put_coro( coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[int]:
430def put_coro(coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[CoroID]:
431    return make_request_to_service(PutCoro, coro_worker, *args, **kwargs)
def try_put_coro( coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[int, NoneType]]:
434def try_put_coro(coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[Optional[CoroID]]:
435    return try_make_request_to_service(PutCoro, coro_worker, *args, **kwargs)
async def aput_coro( coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[int]:
438async def aput_coro(coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[CoroID]:
439    return await amake_request_to_service(PutCoro, coro_worker, *args, **kwargs)
async def atry_put_coro( coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[int, NoneType]]:
442async def atry_put_coro(coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[Optional[CoroID]]:
443    return await atry_make_request_to_service(PutCoro, coro_worker, *args, **kwargs)
def travers_through_all_coro_children_on( prioritized_coro_scheduler: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], coro_id, handler: typing.Callable[[int, int, int, int], NoneType], on_switched_to_stack_based_implementation: typing.Union[typing.Callable, NoneType] = None) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[int]:
449def travers_through_all_coro_children_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[CoroID]:
450    put_coro: PutCoro = service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
451    return put_coro.travers_through_all_children(coro_id, handler, on_switched_to_stack_based_implementation)
def try_travers_through_all_coro_children_on( prioritized_coro_scheduler: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], coro_id, handler: typing.Callable[[int, int, int, int], NoneType], on_switched_to_stack_based_implementation: typing.Union[typing.Callable, NoneType] = None) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[int, NoneType]]:
454def try_travers_through_all_coro_children_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[Optional[CoroID]]:
455    put_coro: PutCoro = get_service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
456    if put_coro is not None:
457        return put_coro.travers_through_all_children(coro_id, handler, on_switched_to_stack_based_implementation)
def travers_through_all_coro_children( coro_id, handler: typing.Callable[[int, int, int, int], NoneType], on_switched_to_stack_based_implementation: typing.Union[typing.Callable, NoneType] = None) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[int]:
460def travers_through_all_coro_children(coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[CoroID]:
461    return travers_through_all_coro_children_on(get_available_coro_scheduler(), coro_id, handler, on_switched_to_stack_based_implementation)
def try_travers_through_all_coro_children( coro_id, handler: typing.Callable[[int, int, int, int], NoneType], on_switched_to_stack_based_implementation: typing.Union[typing.Callable, NoneType] = None) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[int, NoneType]]:
464def try_travers_through_all_coro_children(coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[Optional[CoroID]]:
465    return try_travers_through_all_coro_children_on(get_available_coro_scheduler(), coro_id, handler, on_switched_to_stack_based_implementation)
def get_set_of_all_coro_children_on( prioritized_coro_scheduler: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], coro_id) -> Set[int]:
471def get_set_of_all_coro_children_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id) -> Set[CoroID]:
472    put_coro: PutCoro = service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
473    return put_coro.get_set_of_all_children(coro_id)
def try_get_set_of_all_coro_children_on( prioritized_coro_scheduler: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], coro_id) -> Union[Set[int], NoneType]:
476def try_get_set_of_all_coro_children_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id) -> Optional[Set[CoroID]]:
477    put_coro: PutCoro = get_service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
478    if put_coro is not None:
479        return put_coro.get_set_of_all_children(coro_id)
def get_set_of_all_coro_children(coro_id) -> Set[int]:
482def get_set_of_all_coro_children(coro_id) -> Set[CoroID]:
483    return get_set_of_all_coro_children_on(get_available_coro_scheduler(), coro_id)
def try_get_set_of_all_coro_children(coro_id) -> Union[Set[int], NoneType]:
486def try_get_set_of_all_coro_children(coro_id) -> Optional[Set[CoroID]]:
487    return try_get_set_of_all_coro_children_on(get_available_coro_scheduler(), coro_id)
def travers_through_all_coro_parents_on( prioritized_coro_scheduler: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], coro_id, handler: typing.Callable[[int, int, int, int], NoneType], on_switched_to_stack_based_implementation: typing.Union[typing.Callable, NoneType] = None) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[int]:
493def travers_through_all_coro_parents_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[CoroID]:
494    put_coro: PutCoro = service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
495    return put_coro.travers_through_all_parents(coro_id, handler, on_switched_to_stack_based_implementation)
def try_travers_through_all_coro_parents_on( prioritized_coro_scheduler: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], coro_id, handler: typing.Callable[[int, int, int, int], NoneType], on_switched_to_stack_based_implementation: typing.Union[typing.Callable, NoneType] = None) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[int, NoneType]]:
498def try_travers_through_all_coro_parents_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[Optional[CoroID]]:
499    put_coro: PutCoro = get_service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
500    if put_coro is not None:
501        return put_coro.travers_through_all_parents(coro_id, handler, on_switched_to_stack_based_implementation)
def travers_through_all_coro_parents( coro_id, handler: typing.Callable[[int, int, int, int], NoneType], on_switched_to_stack_based_implementation: typing.Union[typing.Callable, NoneType] = None) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[int]:
504def travers_through_all_coro_parents(coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[CoroID]:
505    return travers_through_all_coro_parents_on(get_available_coro_scheduler(), coro_id, handler, on_switched_to_stack_based_implementation)
def try_travers_through_all_coro_parents( coro_id, handler: typing.Callable[[int, int, int, int], NoneType], on_switched_to_stack_based_implementation: typing.Union[typing.Callable, NoneType] = None) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[int, NoneType]]:
508def try_travers_through_all_coro_parents(coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[Optional[CoroID]]:
509    return try_travers_through_all_coro_parents_on(get_available_coro_scheduler(), coro_id, handler, on_switched_to_stack_based_implementation)
def get_set_of_all_coro_parents_on( prioritized_coro_scheduler: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], coro_id) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[int]:
515def get_set_of_all_coro_parents_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id) -> ValueExistence[CoroID]:
516    put_coro: PutCoro = service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
517    return put_coro.get_set_of_all_parents(coro_id)
def try_get_set_of_all_coro_parents_on( prioritized_coro_scheduler: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType], coro_id) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[int, NoneType]]:
520def try_get_set_of_all_coro_parents_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id) -> ValueExistence[Optional[CoroID]]:
521    put_coro: PutCoro = get_service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
522    if put_coro is not None:
523        return put_coro.get_set_of_all_parents(coro_id)
def get_set_of_all_coro_parents( coro_id) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[int]:
526def get_set_of_all_coro_parents(coro_id) -> ValueExistence[CoroID]:
527    return get_set_of_all_coro_parents_on(get_available_coro_scheduler(), coro_id)
def try_get_set_of_all_coro_parents( coro_id) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[int, NoneType]]:
530def try_get_set_of_all_coro_parents(coro_id) -> ValueExistence[Optional[CoroID]]:
531    return try_get_set_of_all_coro_parents_on(get_available_coro_scheduler(), coro_id)
def get_set_of_all_coros_on( prioritized_coro_scheduler: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[int]:
537def get_set_of_all_coros_on(prioritized_coro_scheduler: Optional[CoroSchedulerType]) -> ValueExistence[CoroID]:
538    put_coro: PutCoro = service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
539    return put_coro.get_set_of_all_coros()
def try_get_set_of_all_coros_on( prioritized_coro_scheduler: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType]) -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[int, NoneType]]:
542def try_get_set_of_all_coros_on(prioritized_coro_scheduler: Optional[CoroSchedulerType]) -> ValueExistence[Optional[CoroID]]:
543    put_coro: PutCoro = get_service_with_explicit_loop(PutCoro, prioritized_coro_scheduler)
544    if put_coro is not None:
545        return put_coro.get_set_of_all_coros()
def get_set_of_all_coros() -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[int]:
548def get_set_of_all_coros() -> ValueExistence[CoroID]:
549    return get_set_of_all_coros_on(get_available_coro_scheduler())
def try_get_set_of_all_coros() -> cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence[typing.Union[int, NoneType]]:
552def try_get_set_of_all_coros() -> ValueExistence[Optional[CoroID]]:
553    return try_get_set_of_all_coros_on(get_available_coro_scheduler())