cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19""" 20Module Docstring 21Docstrings: http://www.python.org/dev/peps/pep-0257/ 22""" 23 24 25__author__ = "ButenkoMS <gtalk@butenkoms.space>" 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 28__license__ = "Apache License, Version 2.0" 29__version__ = "4.4.1" 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 31__email__ = "gtalk@butenkoms.space" 32# __status__ = "Prototype" 33__status__ = "Development" 34# __status__ = "Production" 35 36 37__all__ = [ 38 'TaskIsNotFinished', 'TaskIsFinished', 'Task', 39 'PutCoro', 'PutCoroRequest', 'put_current_from_other_service', 'put_from_other_service', 'put_root_from_other_service', 40 'put_coro_to', 'try_put_coro_to', 'aput_coro_to', 'atry_put_coro_to', 'put_coro', 'try_put_coro', 'aput_coro', 41 'atry_put_coro', 'travers_through_all_coro_children_on', 'try_travers_through_all_coro_children_on', 42 'travers_through_all_coro_children', 'try_travers_through_all_coro_children', 'get_set_of_all_coro_children_on', 43 'try_get_set_of_all_coro_children_on', 'get_set_of_all_coro_children', 'try_get_set_of_all_coro_children', 44 'travers_through_all_coro_parents_on', 'try_travers_through_all_coro_parents_on', 'travers_through_all_coro_parents', 45 'try_travers_through_all_coro_parents', 'get_set_of_all_coro_parents_on', 'try_get_set_of_all_coro_parents_on', 46 'get_set_of_all_coro_parents', 'try_get_set_of_all_coro_parents', 'get_set_of_all_coros_on', 47 'try_get_set_of_all_coros_on', 'get_set_of_all_coros', 'try_get_set_of_all_coros'] 48 49from cengal.parallel_execution.coroutines.coro_scheduler import * 50from cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request import * 51from cengal.code_flow_control.smart_values import ValueExistence 52from cengal.introspection.inspect import get_exception, get_exception_tripple 53from cengal.data_manipulation.tree_traversal import KeyMultiValueTreeTraversal, KeyValueTreeTraversal 54from typing import Hashable, Tuple, List, Optional, Any, Union, cast, Dict, Callable, Set 55 56 57class TaskIsNotFinished(Exception): 58 pass 59 60 61class TaskIsFinished(Exception): 62 pass 63 64 65class Task: 66 def __init__(self, coro_wrapper: CoroWrapperBase): 67 self._coro_wrapper = coro_wrapper 68 self._coro_id = coro_wrapper.coro_id 69 self._coro_wrapper.add_on_coro_del_handler(self._on_coro_del_handler) 70 self._done: bool = False 71 self._result: Any = None 72 self._exception: Exception = None 73 self._on_done_handlers: Set[Callable[['Task'], None]] = set() 74 75 def _on_coro_del_handler(self, coro_wrapper: CoroWrapperBase) -> bool: 76 self._result = self._coro_wrapper.last_result 77 self._exception = self._coro_wrapper.exception 78 self._coro_wrapper = None 79 self._done = True 80 for handler in self._on_done_handlers: 81 handler(self) 82 83 return True 84 85 @property 86 def coro_id(self): 87 return self._coro_id 88 89 @property 90 def coro_wrapper(self): 91 if self._done: 92 raise TaskIsFinished() 93 94 return self._coro_wrapper 95 96 @property 97 def result(self): 98 if not self._done: 99 raise TaskIsNotFinished() 100 101 if self._exception is not None: 102 raise self._exception 103 104 return self._result 105 106 def done(self) -> bool: 107 return self._done 108 109 def __bool__(self) -> bool: 110 return self._done 111 112 def __nonzero__(self): 113 return self.__bool__() 114 115 def add_on_done_handler(self, handler: Callable[['Task'], None]) -> bool: 116 if self._done: 117 handler(self) 118 else: 119 self._on_done_handlers.add(handler) 120 121 return True 122 123 124class PutCoroRequest(ServiceRequest): 125 def turn_on_tree_monitoring(self, turn_on: bool) -> bool: 126 return self._save(0, turn_on) 127 128 def tree_monitoring_state(self) -> bool: 129 return self._save(1) 130 131 # TODO: An implementation required: 132 def set_on_children_start_handler(self, parent_coro_id: CoroID, handler: Callable) -> ServiceRequest: 133 return self._save(2, parent_coro_id, handler) 134 135 # TODO: An implementation required: 136 def set_on_children_del_handler(self, parent_coro_id: CoroID, handler: Callable) -> ServiceRequest: 137 return self._save(3, parent_coro_id, handler) 138 139 def put_background_coro(self, coro_worker: AnyWorker, *args, **kwargs) -> ServiceRequest: 140 return self._save(4, coro_worker, args, kwargs) 141 142 def task(self, coro_worker: AnyWorker, *args, **kwargs) -> ServiceRequest: 143 return self._save(5, coro_worker, args, kwargs, False) 144 145 def background_task(self, coro_worker: AnyWorker, *args, **kwargs) -> ServiceRequest: 146 return self._save(5, coro_worker, args, kwargs, True) 147 148 149class PutCoro(ServiceWithADirectRequestMixin, DualImmediateProcessingServiceMixin, TypedService[CoroID]): 150 def __init__(self, loop: CoroSchedulerType): 151 super(PutCoro, self).__init__(loop) 152 self._tree_monitoring_turned_on: bool = None 153 self._single_task_registration_or_immediate_processing_single_impl: Callable = None 154 self._single_task_registration_or_immediate_processing_single_impl_impl: Callable = None 155 self._turn_on_tree_monitoring(False) 156 self._tree_children_by_parent: Dict[CoroID, Set[CoroID]] = dict() 157 self._tree_parent_by_child: Dict[CoroID, CoroID] = dict() 158 self._dead_coroutines: Set[CoroID] = set() 159 160 self._request_workers = { 161 0: self._turn_on_tree_monitoring, 162 1: self._tree_monitoring_state, 163 4: self._put_background_coro, 164 5: self._task, 165 } 166 167 self.direct_requests: List[Tuple] = list() 168 169 def single_task_registration_or_immediate_processing_single( 170 self, coro_worker: AnyWorker, *args, **kwargs 171 ) -> Tuple[bool, Optional[CoroID], Any]: 172 try: 173 coro = self._single_task_registration_or_immediate_processing_single_impl(coro_worker, *args, **kwargs) 174 except: 175 return True, None, get_exception() 176 177 return True, coro.coro_id, None 178 179 def _single_task_registration_or_immediate_processing_single_impl_with_tree( 180 self, coro_worker: AnyWorker, *args, **kwargs 181 ) -> CoroWrapperBase: 182 return self._single_task_registration_or_immediate_processing_single_impl_with_tree_impl(self.current_caller_coro_info.coro_id, coro_worker, *args, **kwargs) 183 184 def _single_task_registration_or_immediate_processing_single_impl_with_tree_impl( 185 self, parent_coro_id: CoroID, coro_worker: AnyWorker, *args, **kwargs 186 ) -> CoroWrapperBase: 187 coro: CoroWrapperBase = self._loop.put_coro(coro_worker, *args, **kwargs) 188 child_coro_id: CoroID = coro.coro_id 189 if parent_coro_id not in self._tree_children_by_parent: 190 self._tree_children_by_parent[parent_coro_id] = set() 191 192 self._tree_children_by_parent[parent_coro_id].add(child_coro_id) 193 self._tree_parent_by_child[child_coro_id] = parent_coro_id 194 coro.add_on_coro_del_handler(self._on_coro_del_handler) 195 return coro 196 197 def _single_task_registration_or_immediate_processing_single_impl_without_tree( 198 self, coro_worker: AnyWorker, *args, **kwargs 199 ) -> CoroWrapperBase: 200 return self._single_task_registration_or_immediate_processing_single_impl_without_tree_impl(self.current_caller_coro_info.coro_id, coro_worker, *args, **kwargs) 201 202 def _single_task_registration_or_immediate_processing_single_impl_without_tree_impl( 203 self, parent_coro_id: CoroID, coro_worker: AnyWorker, *args, **kwargs 204 ) -> CoroWrapperBase: 205 return self._loop.put_coro(coro_worker, *args, **kwargs) 206 207 def _turn_on_tree_monitoring(self, turn_on: bool): 208 previous_state: bool = self._tree_monitoring_turned_on 209 self._tree_monitoring_turned_on = turn_on 210 if turn_on: 211 self._single_task_registration_or_immediate_processing_single_impl = self._single_task_registration_or_immediate_processing_single_impl_with_tree 212 self._single_task_registration_or_immediate_processing_single_impl_impl = self._single_task_registration_or_immediate_processing_single_impl_with_tree_impl 213 else: 214 self._single_task_registration_or_immediate_processing_single_impl = self._single_task_registration_or_immediate_processing_single_impl_without_tree 215 self._single_task_registration_or_immediate_processing_single_impl_impl = self._single_task_registration_or_immediate_processing_single_impl_without_tree_impl 216 217 return True, previous_state, None 218 219 def put_from_other_service( 220 self, coro_id: CoroID, coro_worker: AnyWorker, *args, **kwargs 221 ) -> CoroWrapperBase: 222 return self._single_task_registration_or_immediate_processing_single_impl_impl(coro_id, coro_worker, *args, **kwargs) 223 224 def put_task_from_other_service( 225 self, coro_id: CoroID, coro_worker: AnyWorker, args, kwargs 226 ) -> Task: 227 coro: CoroWrapperBase = self._single_task_registration_or_immediate_processing_single_impl_impl(coro_id, coro_worker, *args, **kwargs) 228 return Task(coro) 229 230 def put_root_from_other_service( 231 self, coro_worker: AnyWorker, *args, **kwargs 232 ) -> CoroWrapperBase: 233 return self._loop.put_coro(coro_worker, *args, **kwargs) 234 235 def _tree_monitoring_state(self): 236 return True, self._tree_monitoring_turned_on, None 237 238 def _put_background_coro(self, coro_worker: AnyWorker, args, kwargs): 239 try: 240 coro: CoroWrapperBase = self._single_task_registration_or_immediate_processing_single_impl(coro_worker, *args, **kwargs) 241 coro.is_background_coro = True 242 except: 243 return True, None, get_exception() 244 245 return True, coro.coro_id, None 246 247 def _task(self, coro_worker: AnyWorker, args: Tuple, kwargs: Dict, background: bool): 248 try: 249 coro: CoroWrapperBase = self._single_task_registration_or_immediate_processing_single_impl(coro_worker, *args, **kwargs) 250 coro.is_background_coro = background 251 except: 252 return True, None, get_exception() 253 254 return True, Task(coro), None 255 256 def full_processing_iteration(self): 257 if self._dead_coroutines: 258 dead_coroutines_buff = self._dead_coroutines 259 self._dead_coroutines = type(dead_coroutines_buff)() 260 for dead_coro_id in dead_coroutines_buff: 261 parent_coro_id = self._tree_parent_by_child.get(dead_coro_id, None) 262 children = self._tree_children_by_parent.get(dead_coro_id, None) 263 if parent_coro_id is None: 264 if children is not None: 265 for child_coro_id in children: 266 self._tree_parent_by_child.pop(child_coro_id, None) 267 else: 268 sibblings = self._tree_children_by_parent[parent_coro_id] 269 sibblings.discard(dead_coro_id) 270 if children is not None: 271 sibblings.update(children) 272 for child_coro_id in children: 273 self._tree_parent_by_child[child_coro_id] = parent_coro_id 274 275 if self.direct_requests: 276 direct_requests_buff = self.direct_requests 277 self.direct_requests = type(direct_requests_buff)() 278 for coro_worker, args, kwargs in direct_requests_buff: 279 try: 280 coro = self._loop.put_coro(coro_worker, *args, **kwargs) 281 except: 282 ex_type, exception, tracback = get_exception_tripple() 283 if __debug__: dlog(ex_type, exception, tracback) 284 raise 285 286 self.make_dead() 287 288 def _add_direct_request(self, coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[None]: 289 self.direct_requests.append((coro_worker, args, kwargs)) 290 self.make_live() 291 return (False, None) 292 293 def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool: 294 self._dead_coroutines.add(coro.coro_id) 295 self.make_live() 296 return False 297 298 def travers_through_all_children(self, coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None): 299 t = KeyMultiValueTreeTraversal(self._tree_children_by_parent, None, handler, on_switched_to_stack_based_implementation) 300 t(coro_id) 301 302 def get_set_of_all_children(self, coro_id) -> Set[CoroID]: 303 result = set() 304 def handler(deep, parent, child: ValueExistence[CoroID], index): 305 if child: 306 result.add(child[1]) 307 308 self.travers_through_all_children(coro_id, handler) 309 result.discard(coro_id) 310 return result 311 312 def travers_through_all_parents(self, coro_id, handler: Callable[[CoroID, CoroID], None], on_switched_to_stack_based_implementation: Optional[Callable]=None): 313 t = KeyValueTreeTraversal(self._tree_parent_by_child, None, handler, on_switched_to_stack_based_implementation) 314 t(coro_id) 315 316 def get_set_of_all_parents(self, coro_id): 317 result = set() 318 def handler(deep, child, parent, index): 319 if parent is not None: 320 result.add(parent) 321 322 self.travers_through_all_parents(coro_id, handler) 323 result.discard(coro_id) 324 return result 325 326 def get_set_of_all_coros(self): 327 result = set(self._tree_children_by_parent.keys()) 328 result.update(self._tree_parent_by_child.keys()) 329 return result 330 331 def in_work(self) -> bool: 332 result: bool = bool(self.direct_requests) or bool(self._dead_coroutines) 333 return self.thrifty_in_work(result) 334 335 336PutCoroRequest.default_service_type = PutCoro 337 338 339def put_current_from_other_service(current_service: Service, coro_worker: AnyWorker, *args, **kwargs) -> CoroWrapperBase: 340 put_coro: PutCoro = current_service._loop.get_service_instance(PutCoro) 341 return put_coro.put_from_other_service(current_service.current_caller_coro_info.coro_id, coro_worker, *args, **kwargs) 342 343 344def put_from_other_service(current_service: Service, coro_id: CoroID, coro_worker: AnyWorker, *args, **kwargs) -> CoroWrapperBase: 345 put_coro: PutCoro = current_service._loop.get_service_instance(PutCoro) 346 return put_coro.put_from_other_service(coro_id, coro_worker, *args, **kwargs) 347 348 349def put_root_from_other_service(current_service: Service, coro_worker: AnyWorker, *args, **kwargs) -> CoroWrapperBase: 350 put_coro: PutCoro = current_service._loop.get_service_instance(PutCoro) 351 return put_coro.put_root_from_other_service(coro_worker, *args, **kwargs) 352 353 354def put_coro_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[Optional[CoroID]]: 355 """_summary_ 356 context can be generated by one of the [interface_and_loop_with_backup_loop, get_interface_and_loop_with_backup_loop, interface_and_loop_with_explicit_loop, get_interface_and_loop_with_explicit_loop, interface_for_an_explicit_loop, get_interface_for_an_explicit_loop] functions from the cengal/parallel_execution/coroutines/coro_scheduler module 357 358 An example: 359 360 from cengal.parallel_execution.coroutines.coro_scheduler import get_interface_and_loop_with_explicit_loop, CoroSchedulerType, ExplicitWorker, Worker, CoroID 361 from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import put_coro_to 362 from typing import Optional, Union 363 364 def my_func(loop: CoroSchedulerType, coro_worker: AnyWorker, a, b) -> Optional[CoroID]: 365 resulting_coro_id: Optional[CoroID] = None 366 try: 367 result = put_coro_to(get_interface_and_loop_with_explicit_loop(loop), coro_worker, a, b) 368 if result: 369 print(f'We are inside of the loop AND in the coroutine. Our requested coro already was created at this moment. Coro id: {result.value}') 370 resulting_coro_id = result.value 371 else: 372 print('We are outside of the loop or not in the coroutine body. Our requested coroutine will be created in the near future.') 373 except CoroSchedulerContextIsNotAvailable: 374 print('We are outside of the loop AND no loop was selected as a Primary AND our given `loop` var is None) 375 376 return resulting_coro_id 377 378 Args: 379 context (Tuple[Optional[CoroSchedulerType], Optional[Interface], bool]): _description_ 380 coro_worker (AnyWorker): _description_ 381 382 Returns: 383 ValueExistence[Optional[CoroID]]: _description_ 384 """ 385 return make_request_to_service_with_context(context, PutCoro, coro_worker, *args, **kwargs) 386 387 388def try_put_coro_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[Optional[CoroID]]: 389 """_summary_ 390 context can be generated by one of the [interface_and_loop_with_backup_loop, get_interface_and_loop_with_backup_loop, interface_and_loop_with_explicit_loop, get_interface_and_loop_with_explicit_loop, interface_for_an_explicit_loop, get_interface_for_an_explicit_loop] functions from the cengal/parallel_execution/coroutines/coro_scheduler module 391 392 An example: 393 394 from cengal.parallel_execution.coroutines.coro_scheduler import get_interface_and_loop_with_explicit_loop, CoroSchedulerType, ExplicitWorker, Worker, CoroID 395 from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import try_put_coro_to 396 from typing import Optional, Union 397 398 def my_func(loop: CoroSchedulerType, coro_worker: AnyWorker, a, b) -> Optional[CoroID]: 399 resulting_coro_id: Optional[CoroID] = None 400 result = put_coro_to(get_interface_and_loop_with_explicit_loop(loop), coro_worker, a, b) 401 if result: 402 print(f'We are inside of the loop AND in the coroutine. Our requested coro already was created at this moment. Coro id: {result.value}') 403 resulting_coro_id = result.value 404 else: 405 print('There are two possibilities:) 406 print('1) We are outside of the loop or not in the coroutine body. Our requested coroutine will be created in the near future.') 407 print('2) We are outside of the loop AND no loop was selected as a Primary AND our given `loop` var is None. Our requested coroutine will NOT be created at all.) 408 409 return resulting_coro_id 410 411 Args: 412 context (Tuple[Optional[CoroSchedulerType], Optional[Interface], bool]): _description_ 413 coro_worker (AnyWorker): _description_ 414 415 Returns: 416 ValueExistence[Optional[CoroID]]: _description_ 417 """ 418 return try_make_request_to_service_with_context(context, PutCoro, coro_worker, *args, **kwargs) 419 420 421async def aput_coro_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[CoroID]: 422 return await amake_request_to_service_with_context(context, PutCoro, coro_worker, *args, **kwargs) 423 424 425async def atry_put_coro_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[Optional[CoroID]]: 426 return await atry_make_request_to_service_with_context(context, PutCoro, coro_worker, *args, **kwargs) 427 428 429def put_coro(coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[CoroID]: 430 return make_request_to_service(PutCoro, coro_worker, *args, **kwargs) 431 432 433def try_put_coro(coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[Optional[CoroID]]: 434 return try_make_request_to_service(PutCoro, coro_worker, *args, **kwargs) 435 436 437async def aput_coro(coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[CoroID]: 438 return await amake_request_to_service(PutCoro, coro_worker, *args, **kwargs) 439 440 441async def atry_put_coro(coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[Optional[CoroID]]: 442 return await atry_make_request_to_service(PutCoro, coro_worker, *args, **kwargs) 443 444 445# ================================================== 446 447 448def travers_through_all_coro_children_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[CoroID]: 449 put_coro: PutCoro = service_with_explicit_loop(PutCoro, prioritized_coro_scheduler) 450 return put_coro.travers_through_all_children(coro_id, handler, on_switched_to_stack_based_implementation) 451 452 453def try_travers_through_all_coro_children_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[Optional[CoroID]]: 454 put_coro: PutCoro = get_service_with_explicit_loop(PutCoro, prioritized_coro_scheduler) 455 if put_coro is not None: 456 return put_coro.travers_through_all_children(coro_id, handler, on_switched_to_stack_based_implementation) 457 458 459def travers_through_all_coro_children(coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[CoroID]: 460 return travers_through_all_coro_children_on(get_available_coro_scheduler(), coro_id, handler, on_switched_to_stack_based_implementation) 461 462 463def try_travers_through_all_coro_children(coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[Optional[CoroID]]: 464 return try_travers_through_all_coro_children_on(get_available_coro_scheduler(), coro_id, handler, on_switched_to_stack_based_implementation) 465 466 467# ================================================== 468 469 470def get_set_of_all_coro_children_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id) -> Set[CoroID]: 471 put_coro: PutCoro = service_with_explicit_loop(PutCoro, prioritized_coro_scheduler) 472 return put_coro.get_set_of_all_children(coro_id) 473 474 475def try_get_set_of_all_coro_children_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id) -> Optional[Set[CoroID]]: 476 put_coro: PutCoro = get_service_with_explicit_loop(PutCoro, prioritized_coro_scheduler) 477 if put_coro is not None: 478 return put_coro.get_set_of_all_children(coro_id) 479 480 481def get_set_of_all_coro_children(coro_id) -> Set[CoroID]: 482 return get_set_of_all_coro_children_on(get_available_coro_scheduler(), coro_id) 483 484 485def try_get_set_of_all_coro_children(coro_id) -> Optional[Set[CoroID]]: 486 return try_get_set_of_all_coro_children_on(get_available_coro_scheduler(), coro_id) 487 488 489# ================================================== 490 491 492def travers_through_all_coro_parents_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[CoroID]: 493 put_coro: PutCoro = service_with_explicit_loop(PutCoro, prioritized_coro_scheduler) 494 return put_coro.travers_through_all_parents(coro_id, handler, on_switched_to_stack_based_implementation) 495 496 497def try_travers_through_all_coro_parents_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[Optional[CoroID]]: 498 put_coro: PutCoro = get_service_with_explicit_loop(PutCoro, prioritized_coro_scheduler) 499 if put_coro is not None: 500 return put_coro.travers_through_all_parents(coro_id, handler, on_switched_to_stack_based_implementation) 501 502 503def travers_through_all_coro_parents(coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[CoroID]: 504 return travers_through_all_coro_parents_on(get_available_coro_scheduler(), coro_id, handler, on_switched_to_stack_based_implementation) 505 506 507def try_travers_through_all_coro_parents(coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[Optional[CoroID]]: 508 return try_travers_through_all_coro_parents_on(get_available_coro_scheduler(), coro_id, handler, on_switched_to_stack_based_implementation) 509 510 511# ================================================== 512 513 514def get_set_of_all_coro_parents_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id) -> ValueExistence[CoroID]: 515 put_coro: PutCoro = service_with_explicit_loop(PutCoro, prioritized_coro_scheduler) 516 return put_coro.get_set_of_all_parents(coro_id) 517 518 519def try_get_set_of_all_coro_parents_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id) -> ValueExistence[Optional[CoroID]]: 520 put_coro: PutCoro = get_service_with_explicit_loop(PutCoro, prioritized_coro_scheduler) 521 if put_coro is not None: 522 return put_coro.get_set_of_all_parents(coro_id) 523 524 525def get_set_of_all_coro_parents(coro_id) -> ValueExistence[CoroID]: 526 return get_set_of_all_coro_parents_on(get_available_coro_scheduler(), coro_id) 527 528 529def try_get_set_of_all_coro_parents(coro_id) -> ValueExistence[Optional[CoroID]]: 530 return try_get_set_of_all_coro_parents_on(get_available_coro_scheduler(), coro_id) 531 532 533# ================================================== 534 535 536def get_set_of_all_coros_on(prioritized_coro_scheduler: Optional[CoroSchedulerType]) -> ValueExistence[CoroID]: 537 put_coro: PutCoro = service_with_explicit_loop(PutCoro, prioritized_coro_scheduler) 538 return put_coro.get_set_of_all_coros() 539 540 541def try_get_set_of_all_coros_on(prioritized_coro_scheduler: Optional[CoroSchedulerType]) -> ValueExistence[Optional[CoroID]]: 542 put_coro: PutCoro = get_service_with_explicit_loop(PutCoro, prioritized_coro_scheduler) 543 if put_coro is not None: 544 return put_coro.get_set_of_all_coros() 545 546 547def get_set_of_all_coros() -> ValueExistence[CoroID]: 548 return get_set_of_all_coros_on(get_available_coro_scheduler()) 549 550 551def try_get_set_of_all_coros() -> ValueExistence[Optional[CoroID]]: 552 return try_get_set_of_all_coros_on(get_available_coro_scheduler())
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
66class Task: 67 def __init__(self, coro_wrapper: CoroWrapperBase): 68 self._coro_wrapper = coro_wrapper 69 self._coro_id = coro_wrapper.coro_id 70 self._coro_wrapper.add_on_coro_del_handler(self._on_coro_del_handler) 71 self._done: bool = False 72 self._result: Any = None 73 self._exception: Exception = None 74 self._on_done_handlers: Set[Callable[['Task'], None]] = set() 75 76 def _on_coro_del_handler(self, coro_wrapper: CoroWrapperBase) -> bool: 77 self._result = self._coro_wrapper.last_result 78 self._exception = self._coro_wrapper.exception 79 self._coro_wrapper = None 80 self._done = True 81 for handler in self._on_done_handlers: 82 handler(self) 83 84 return True 85 86 @property 87 def coro_id(self): 88 return self._coro_id 89 90 @property 91 def coro_wrapper(self): 92 if self._done: 93 raise TaskIsFinished() 94 95 return self._coro_wrapper 96 97 @property 98 def result(self): 99 if not self._done: 100 raise TaskIsNotFinished() 101 102 if self._exception is not None: 103 raise self._exception 104 105 return self._result 106 107 def done(self) -> bool: 108 return self._done 109 110 def __bool__(self) -> bool: 111 return self._done 112 113 def __nonzero__(self): 114 return self.__bool__() 115 116 def add_on_done_handler(self, handler: Callable[['Task'], None]) -> bool: 117 if self._done: 118 handler(self) 119 else: 120 self._on_done_handlers.add(handler) 121 122 return True
67 def __init__(self, coro_wrapper: CoroWrapperBase): 68 self._coro_wrapper = coro_wrapper 69 self._coro_id = coro_wrapper.coro_id 70 self._coro_wrapper.add_on_coro_del_handler(self._on_coro_del_handler) 71 self._done: bool = False 72 self._result: Any = None 73 self._exception: Exception = None 74 self._on_done_handlers: Set[Callable[['Task'], None]] = set()
150class PutCoro(ServiceWithADirectRequestMixin, DualImmediateProcessingServiceMixin, TypedService[CoroID]): 151 def __init__(self, loop: CoroSchedulerType): 152 super(PutCoro, self).__init__(loop) 153 self._tree_monitoring_turned_on: bool = None 154 self._single_task_registration_or_immediate_processing_single_impl: Callable = None 155 self._single_task_registration_or_immediate_processing_single_impl_impl: Callable = None 156 self._turn_on_tree_monitoring(False) 157 self._tree_children_by_parent: Dict[CoroID, Set[CoroID]] = dict() 158 self._tree_parent_by_child: Dict[CoroID, CoroID] = dict() 159 self._dead_coroutines: Set[CoroID] = set() 160 161 self._request_workers = { 162 0: self._turn_on_tree_monitoring, 163 1: self._tree_monitoring_state, 164 4: self._put_background_coro, 165 5: self._task, 166 } 167 168 self.direct_requests: List[Tuple] = list() 169 170 def single_task_registration_or_immediate_processing_single( 171 self, coro_worker: AnyWorker, *args, **kwargs 172 ) -> Tuple[bool, Optional[CoroID], Any]: 173 try: 174 coro = self._single_task_registration_or_immediate_processing_single_impl(coro_worker, *args, **kwargs) 175 except: 176 return True, None, get_exception() 177 178 return True, coro.coro_id, None 179 180 def _single_task_registration_or_immediate_processing_single_impl_with_tree( 181 self, coro_worker: AnyWorker, *args, **kwargs 182 ) -> CoroWrapperBase: 183 return self._single_task_registration_or_immediate_processing_single_impl_with_tree_impl(self.current_caller_coro_info.coro_id, coro_worker, *args, **kwargs) 184 185 def _single_task_registration_or_immediate_processing_single_impl_with_tree_impl( 186 self, parent_coro_id: CoroID, coro_worker: AnyWorker, *args, **kwargs 187 ) -> CoroWrapperBase: 188 coro: CoroWrapperBase = self._loop.put_coro(coro_worker, *args, **kwargs) 189 child_coro_id: CoroID = coro.coro_id 190 if parent_coro_id not in self._tree_children_by_parent: 191 self._tree_children_by_parent[parent_coro_id] = set() 192 193 self._tree_children_by_parent[parent_coro_id].add(child_coro_id) 194 self._tree_parent_by_child[child_coro_id] = parent_coro_id 195 coro.add_on_coro_del_handler(self._on_coro_del_handler) 196 return coro 197 198 def _single_task_registration_or_immediate_processing_single_impl_without_tree( 199 self, coro_worker: AnyWorker, *args, **kwargs 200 ) -> CoroWrapperBase: 201 return self._single_task_registration_or_immediate_processing_single_impl_without_tree_impl(self.current_caller_coro_info.coro_id, coro_worker, *args, **kwargs) 202 203 def _single_task_registration_or_immediate_processing_single_impl_without_tree_impl( 204 self, parent_coro_id: CoroID, coro_worker: AnyWorker, *args, **kwargs 205 ) -> CoroWrapperBase: 206 return self._loop.put_coro(coro_worker, *args, **kwargs) 207 208 def _turn_on_tree_monitoring(self, turn_on: bool): 209 previous_state: bool = self._tree_monitoring_turned_on 210 self._tree_monitoring_turned_on = turn_on 211 if turn_on: 212 self._single_task_registration_or_immediate_processing_single_impl = self._single_task_registration_or_immediate_processing_single_impl_with_tree 213 self._single_task_registration_or_immediate_processing_single_impl_impl = self._single_task_registration_or_immediate_processing_single_impl_with_tree_impl 214 else: 215 self._single_task_registration_or_immediate_processing_single_impl = self._single_task_registration_or_immediate_processing_single_impl_without_tree 216 self._single_task_registration_or_immediate_processing_single_impl_impl = self._single_task_registration_or_immediate_processing_single_impl_without_tree_impl 217 218 return True, previous_state, None 219 220 def put_from_other_service( 221 self, coro_id: CoroID, coro_worker: AnyWorker, *args, **kwargs 222 ) -> CoroWrapperBase: 223 return self._single_task_registration_or_immediate_processing_single_impl_impl(coro_id, coro_worker, *args, **kwargs) 224 225 def put_task_from_other_service( 226 self, coro_id: CoroID, coro_worker: AnyWorker, args, kwargs 227 ) -> Task: 228 coro: CoroWrapperBase = self._single_task_registration_or_immediate_processing_single_impl_impl(coro_id, coro_worker, *args, **kwargs) 229 return Task(coro) 230 231 def put_root_from_other_service( 232 self, coro_worker: AnyWorker, *args, **kwargs 233 ) -> CoroWrapperBase: 234 return self._loop.put_coro(coro_worker, *args, **kwargs) 235 236 def _tree_monitoring_state(self): 237 return True, self._tree_monitoring_turned_on, None 238 239 def _put_background_coro(self, coro_worker: AnyWorker, args, kwargs): 240 try: 241 coro: CoroWrapperBase = self._single_task_registration_or_immediate_processing_single_impl(coro_worker, *args, **kwargs) 242 coro.is_background_coro = True 243 except: 244 return True, None, get_exception() 245 246 return True, coro.coro_id, None 247 248 def _task(self, coro_worker: AnyWorker, args: Tuple, kwargs: Dict, background: bool): 249 try: 250 coro: CoroWrapperBase = self._single_task_registration_or_immediate_processing_single_impl(coro_worker, *args, **kwargs) 251 coro.is_background_coro = background 252 except: 253 return True, None, get_exception() 254 255 return True, Task(coro), None 256 257 def full_processing_iteration(self): 258 if self._dead_coroutines: 259 dead_coroutines_buff = self._dead_coroutines 260 self._dead_coroutines = type(dead_coroutines_buff)() 261 for dead_coro_id in dead_coroutines_buff: 262 parent_coro_id = self._tree_parent_by_child.get(dead_coro_id, None) 263 children = self._tree_children_by_parent.get(dead_coro_id, None) 264 if parent_coro_id is None: 265 if children is not None: 266 for child_coro_id in children: 267 self._tree_parent_by_child.pop(child_coro_id, None) 268 else: 269 sibblings = self._tree_children_by_parent[parent_coro_id] 270 sibblings.discard(dead_coro_id) 271 if children is not None: 272 sibblings.update(children) 273 for child_coro_id in children: 274 self._tree_parent_by_child[child_coro_id] = parent_coro_id 275 276 if self.direct_requests: 277 direct_requests_buff = self.direct_requests 278 self.direct_requests = type(direct_requests_buff)() 279 for coro_worker, args, kwargs in direct_requests_buff: 280 try: 281 coro = self._loop.put_coro(coro_worker, *args, **kwargs) 282 except: 283 ex_type, exception, tracback = get_exception_tripple() 284 if __debug__: dlog(ex_type, exception, tracback) 285 raise 286 287 self.make_dead() 288 289 def _add_direct_request(self, coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[None]: 290 self.direct_requests.append((coro_worker, args, kwargs)) 291 self.make_live() 292 return (False, None) 293 294 def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool: 295 self._dead_coroutines.add(coro.coro_id) 296 self.make_live() 297 return False 298 299 def travers_through_all_children(self, coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None): 300 t = KeyMultiValueTreeTraversal(self._tree_children_by_parent, None, handler, on_switched_to_stack_based_implementation) 301 t(coro_id) 302 303 def get_set_of_all_children(self, coro_id) -> Set[CoroID]: 304 result = set() 305 def handler(deep, parent, child: ValueExistence[CoroID], index): 306 if child: 307 result.add(child[1]) 308 309 self.travers_through_all_children(coro_id, handler) 310 result.discard(coro_id) 311 return result 312 313 def travers_through_all_parents(self, coro_id, handler: Callable[[CoroID, CoroID], None], on_switched_to_stack_based_implementation: Optional[Callable]=None): 314 t = KeyValueTreeTraversal(self._tree_parent_by_child, None, handler, on_switched_to_stack_based_implementation) 315 t(coro_id) 316 317 def get_set_of_all_parents(self, coro_id): 318 result = set() 319 def handler(deep, child, parent, index): 320 if parent is not None: 321 result.add(parent) 322 323 self.travers_through_all_parents(coro_id, handler) 324 result.discard(coro_id) 325 return result 326 327 def get_set_of_all_coros(self): 328 result = set(self._tree_children_by_parent.keys()) 329 result.update(self._tree_parent_by_child.keys()) 330 return result 331 332 def in_work(self) -> bool: 333 result: bool = bool(self.direct_requests) or bool(self._dead_coroutines) 334 return self.thrifty_in_work(result)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
151 def __init__(self, loop: CoroSchedulerType): 152 super(PutCoro, self).__init__(loop) 153 self._tree_monitoring_turned_on: bool = None 154 self._single_task_registration_or_immediate_processing_single_impl: Callable = None 155 self._single_task_registration_or_immediate_processing_single_impl_impl: Callable = None 156 self._turn_on_tree_monitoring(False) 157 self._tree_children_by_parent: Dict[CoroID, Set[CoroID]] = dict() 158 self._tree_parent_by_child: Dict[CoroID, CoroID] = dict() 159 self._dead_coroutines: Set[CoroID] = set() 160 161 self._request_workers = { 162 0: self._turn_on_tree_monitoring, 163 1: self._tree_monitoring_state, 164 4: self._put_background_coro, 165 5: self._task, 166 } 167 168 self.direct_requests: List[Tuple] = list()
257 def full_processing_iteration(self): 258 if self._dead_coroutines: 259 dead_coroutines_buff = self._dead_coroutines 260 self._dead_coroutines = type(dead_coroutines_buff)() 261 for dead_coro_id in dead_coroutines_buff: 262 parent_coro_id = self._tree_parent_by_child.get(dead_coro_id, None) 263 children = self._tree_children_by_parent.get(dead_coro_id, None) 264 if parent_coro_id is None: 265 if children is not None: 266 for child_coro_id in children: 267 self._tree_parent_by_child.pop(child_coro_id, None) 268 else: 269 sibblings = self._tree_children_by_parent[parent_coro_id] 270 sibblings.discard(dead_coro_id) 271 if children is not None: 272 sibblings.update(children) 273 for child_coro_id in children: 274 self._tree_parent_by_child[child_coro_id] = parent_coro_id 275 276 if self.direct_requests: 277 direct_requests_buff = self.direct_requests 278 self.direct_requests = type(direct_requests_buff)() 279 for coro_worker, args, kwargs in direct_requests_buff: 280 try: 281 coro = self._loop.put_coro(coro_worker, *args, **kwargs) 282 except: 283 ex_type, exception, tracback = get_exception_tripple() 284 if __debug__: dlog(ex_type, exception, tracback) 285 raise 286 287 self.make_dead()
299 def travers_through_all_children(self, coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None): 300 t = KeyMultiValueTreeTraversal(self._tree_children_by_parent, None, handler, on_switched_to_stack_based_implementation) 301 t(coro_id)
332 def in_work(self) -> bool: 333 result: bool = bool(self.direct_requests) or bool(self._dead_coroutines) 334 return self.thrifty_in_work(result)
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.DualImmediateProcessingServiceMixin
- single_task_registration_or_immediate_processing
- single_task_registration_or_immediate_processing_multiple
- single_task_registration_or_immediate_processing_single
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- time_left_before_next_event
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
- destroy
125class PutCoroRequest(ServiceRequest): 126 def turn_on_tree_monitoring(self, turn_on: bool) -> bool: 127 return self._save(0, turn_on) 128 129 def tree_monitoring_state(self) -> bool: 130 return self._save(1) 131 132 # TODO: An implementation required: 133 def set_on_children_start_handler(self, parent_coro_id: CoroID, handler: Callable) -> ServiceRequest: 134 return self._save(2, parent_coro_id, handler) 135 136 # TODO: An implementation required: 137 def set_on_children_del_handler(self, parent_coro_id: CoroID, handler: Callable) -> ServiceRequest: 138 return self._save(3, parent_coro_id, handler) 139 140 def put_background_coro(self, coro_worker: AnyWorker, *args, **kwargs) -> ServiceRequest: 141 return self._save(4, coro_worker, args, kwargs) 142 143 def task(self, coro_worker: AnyWorker, *args, **kwargs) -> ServiceRequest: 144 return self._save(5, coro_worker, args, kwargs, False) 145 146 def background_task(self, coro_worker: AnyWorker, *args, **kwargs) -> ServiceRequest: 147 return self._save(5, coro_worker, args, kwargs, True)
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
- default__request__type__
- request_type
- args
- kwargs
- provide_to_request_handler
- interface
- i
- async_interface
- ai
340def put_current_from_other_service(current_service: Service, coro_worker: AnyWorker, *args, **kwargs) -> CoroWrapperBase: 341 put_coro: PutCoro = current_service._loop.get_service_instance(PutCoro) 342 return put_coro.put_from_other_service(current_service.current_caller_coro_info.coro_id, coro_worker, *args, **kwargs)
355def put_coro_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[Optional[CoroID]]: 356 """_summary_ 357 context can be generated by one of the [interface_and_loop_with_backup_loop, get_interface_and_loop_with_backup_loop, interface_and_loop_with_explicit_loop, get_interface_and_loop_with_explicit_loop, interface_for_an_explicit_loop, get_interface_for_an_explicit_loop] functions from the cengal/parallel_execution/coroutines/coro_scheduler module 358 359 An example: 360 361 from cengal.parallel_execution.coroutines.coro_scheduler import get_interface_and_loop_with_explicit_loop, CoroSchedulerType, ExplicitWorker, Worker, CoroID 362 from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import put_coro_to 363 from typing import Optional, Union 364 365 def my_func(loop: CoroSchedulerType, coro_worker: AnyWorker, a, b) -> Optional[CoroID]: 366 resulting_coro_id: Optional[CoroID] = None 367 try: 368 result = put_coro_to(get_interface_and_loop_with_explicit_loop(loop), coro_worker, a, b) 369 if result: 370 print(f'We are inside of the loop AND in the coroutine. Our requested coro already was created at this moment. Coro id: {result.value}') 371 resulting_coro_id = result.value 372 else: 373 print('We are outside of the loop or not in the coroutine body. Our requested coroutine will be created in the near future.') 374 except CoroSchedulerContextIsNotAvailable: 375 print('We are outside of the loop AND no loop was selected as a Primary AND our given `loop` var is None) 376 377 return resulting_coro_id 378 379 Args: 380 context (Tuple[Optional[CoroSchedulerType], Optional[Interface], bool]): _description_ 381 coro_worker (AnyWorker): _description_ 382 383 Returns: 384 ValueExistence[Optional[CoroID]]: _description_ 385 """ 386 return make_request_to_service_with_context(context, PutCoro, coro_worker, *args, **kwargs)
_summary_ context can be generated by one of the [interface_and_loop_with_backup_loop, get_interface_and_loop_with_backup_loop, interface_and_loop_with_explicit_loop, get_interface_and_loop_with_explicit_loop, interface_for_an_explicit_loop, get_interface_for_an_explicit_loop] functions from the cengal/parallel_execution/coroutines/coro_scheduler module
An example:
from cengal.parallel_execution.coroutines.coro_scheduler import get_interface_and_loop_with_explicit_loop, CoroSchedulerType, ExplicitWorker, Worker, CoroID
from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import put_coro_to
from typing import Optional, Union
def my_func(loop: CoroSchedulerType, coro_worker: AnyWorker, a, b) -> Optional[CoroID]:
resulting_coro_id: Optional[CoroID] = None
try:
result = put_coro_to(get_interface_and_loop_with_explicit_loop(loop), coro_worker, a, b)
if result:
print(f'We are inside of the loop AND in the coroutine. Our requested coro already was created at this moment. Coro id: {result.value}')
resulting_coro_id = result.value
else:
print('We are outside of the loop or not in the coroutine body. Our requested coroutine will be created in the near future.')
except CoroSchedulerContextIsNotAvailable:
print('We are outside of the loop AND no loop was selected as a Primary AND our given `loop` var is None)
return resulting_coro_id
Args: context (Tuple[Optional[CoroSchedulerType], Optional[Interface], bool]): _description_ coro_worker (AnyWorker): _description_
Returns: ValueExistence[Optional[CoroID]]: _description_
389def try_put_coro_to(context: Tuple[Optional[CoroSchedulerType], Optional[Interface], bool], coro_worker: AnyWorker, *args, **kwargs) -> ValueExistence[Optional[CoroID]]: 390 """_summary_ 391 context can be generated by one of the [interface_and_loop_with_backup_loop, get_interface_and_loop_with_backup_loop, interface_and_loop_with_explicit_loop, get_interface_and_loop_with_explicit_loop, interface_for_an_explicit_loop, get_interface_for_an_explicit_loop] functions from the cengal/parallel_execution/coroutines/coro_scheduler module 392 393 An example: 394 395 from cengal.parallel_execution.coroutines.coro_scheduler import get_interface_and_loop_with_explicit_loop, CoroSchedulerType, ExplicitWorker, Worker, CoroID 396 from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import try_put_coro_to 397 from typing import Optional, Union 398 399 def my_func(loop: CoroSchedulerType, coro_worker: AnyWorker, a, b) -> Optional[CoroID]: 400 resulting_coro_id: Optional[CoroID] = None 401 result = put_coro_to(get_interface_and_loop_with_explicit_loop(loop), coro_worker, a, b) 402 if result: 403 print(f'We are inside of the loop AND in the coroutine. Our requested coro already was created at this moment. Coro id: {result.value}') 404 resulting_coro_id = result.value 405 else: 406 print('There are two possibilities:) 407 print('1) We are outside of the loop or not in the coroutine body. Our requested coroutine will be created in the near future.') 408 print('2) We are outside of the loop AND no loop was selected as a Primary AND our given `loop` var is None. Our requested coroutine will NOT be created at all.) 409 410 return resulting_coro_id 411 412 Args: 413 context (Tuple[Optional[CoroSchedulerType], Optional[Interface], bool]): _description_ 414 coro_worker (AnyWorker): _description_ 415 416 Returns: 417 ValueExistence[Optional[CoroID]]: _description_ 418 """ 419 return try_make_request_to_service_with_context(context, PutCoro, coro_worker, *args, **kwargs)
_summary_ context can be generated by one of the [interface_and_loop_with_backup_loop, get_interface_and_loop_with_backup_loop, interface_and_loop_with_explicit_loop, get_interface_and_loop_with_explicit_loop, interface_for_an_explicit_loop, get_interface_for_an_explicit_loop] functions from the cengal/parallel_execution/coroutines/coro_scheduler module
An example:
from cengal.parallel_execution.coroutines.coro_scheduler import get_interface_and_loop_with_explicit_loop, CoroSchedulerType, ExplicitWorker, Worker, CoroID
from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import try_put_coro_to
from typing import Optional, Union
def my_func(loop: CoroSchedulerType, coro_worker: AnyWorker, a, b) -> Optional[CoroID]:
resulting_coro_id: Optional[CoroID] = None
result = put_coro_to(get_interface_and_loop_with_explicit_loop(loop), coro_worker, a, b)
if result:
print(f'We are inside of the loop AND in the coroutine. Our requested coro already was created at this moment. Coro id: {result.value}')
resulting_coro_id = result.value
else:
print('There are two possibilities:)
print('1) We are outside of the loop or not in the coroutine body. Our requested coroutine will be created in the near future.')
print('2) We are outside of the loop AND no loop was selected as a Primary AND our given `loop` var is None. Our requested coroutine will NOT be created at all.)
return resulting_coro_id
Args: context (Tuple[Optional[CoroSchedulerType], Optional[Interface], bool]): _description_ coro_worker (AnyWorker): _description_
Returns: ValueExistence[Optional[CoroID]]: _description_
449def travers_through_all_coro_children_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[CoroID]: 450 put_coro: PutCoro = service_with_explicit_loop(PutCoro, prioritized_coro_scheduler) 451 return put_coro.travers_through_all_children(coro_id, handler, on_switched_to_stack_based_implementation)
454def try_travers_through_all_coro_children_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[Optional[CoroID]]: 455 put_coro: PutCoro = get_service_with_explicit_loop(PutCoro, prioritized_coro_scheduler) 456 if put_coro is not None: 457 return put_coro.travers_through_all_children(coro_id, handler, on_switched_to_stack_based_implementation)
460def travers_through_all_coro_children(coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[CoroID]: 461 return travers_through_all_coro_children_on(get_available_coro_scheduler(), coro_id, handler, on_switched_to_stack_based_implementation)
464def try_travers_through_all_coro_children(coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[Optional[CoroID]]: 465 return try_travers_through_all_coro_children_on(get_available_coro_scheduler(), coro_id, handler, on_switched_to_stack_based_implementation)
476def try_get_set_of_all_coro_children_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id) -> Optional[Set[CoroID]]: 477 put_coro: PutCoro = get_service_with_explicit_loop(PutCoro, prioritized_coro_scheduler) 478 if put_coro is not None: 479 return put_coro.get_set_of_all_children(coro_id)
493def travers_through_all_coro_parents_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[CoroID]: 494 put_coro: PutCoro = service_with_explicit_loop(PutCoro, prioritized_coro_scheduler) 495 return put_coro.travers_through_all_parents(coro_id, handler, on_switched_to_stack_based_implementation)
498def try_travers_through_all_coro_parents_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[Optional[CoroID]]: 499 put_coro: PutCoro = get_service_with_explicit_loop(PutCoro, prioritized_coro_scheduler) 500 if put_coro is not None: 501 return put_coro.travers_through_all_parents(coro_id, handler, on_switched_to_stack_based_implementation)
504def travers_through_all_coro_parents(coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[CoroID]: 505 return travers_through_all_coro_parents_on(get_available_coro_scheduler(), coro_id, handler, on_switched_to_stack_based_implementation)
508def try_travers_through_all_coro_parents(coro_id, handler: Callable[[int, CoroID, CoroID, int], None], on_switched_to_stack_based_implementation: Optional[Callable]=None) -> ValueExistence[Optional[CoroID]]: 509 return try_travers_through_all_coro_parents_on(get_available_coro_scheduler(), coro_id, handler, on_switched_to_stack_based_implementation)
520def try_get_set_of_all_coro_parents_on(prioritized_coro_scheduler: Optional[CoroSchedulerType], coro_id) -> ValueExistence[Optional[CoroID]]: 521 put_coro: PutCoro = get_service_with_explicit_loop(PutCoro, prioritized_coro_scheduler) 522 if put_coro is not None: 523 return put_coro.get_set_of_all_parents(coro_id)