cengal.parallel_execution.coroutines.coro_tools.wait_tasks.versions.v_0.wait_tasks
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19# __all__ = [] 20__all__ = [ 21 'WaitType', 22 'TasksListIsEmptyError', 23 'IsNotDoneYetError', 24 'NormalOnDoneHandler', 25 'NormalInfo', 26 'FastestOnDoneHandler', 27 'FastestInfo', 28 'FastestSuccessfulOnDoneHandler', 29 'FastestSuccessfulInfo', 30 'FastestExceptionOnDoneHandler', 31 'FastestExceptionInfo', 32 'FastestCustomOnDoneHandler', 33 'FastestCustomInfo', 34 'AtomicOnDoneHandler', 35 'AtomicInfo', 36 'AtomicCustomOnDoneHandler', 37 'success_result_criteria_handler', 38 'exception_result_criteria_handler', 39 'same_first_failed_result_formatter', 40 'none_first_failed_result_formatter', 41 'AtomicCustomInfo', 42 'GracefulTerminationSettings', 43 'TerminationReason', 44 'atask_graceful_destroyer', 45 'aterminate_tasks_explicit', 46 'aterminate_tasks_implicit', 47 'aterminate_tasks', 48 'aterminate_tasks_im', 49 'terminate_tasks_explicit', 50 'terminate_tasks_implicit', 51 'terminate_tasks', 52 'terminate_tasks_im', 53 'await_tasks_explicit', 54 'await_tasks_implicit', 55 'await_tasks', 56 'await_tasks_im', 57 'wait_tasks_explicit', 58 'wait_tasks_implicit', 59 'wait_tasks', 60 'wait_tasks_im', 61 'await_tasks_fastest_explicit', 62 'await_tasks_fastest_implicit', 63 'await_tasks_fastest', 64 'await_tasks_fastest_im', 65 'wait_tasks_fastest_explicit', 66 'wait_tasks_fastest_implicit', 67 'wait_tasks_fastest', 68 'wait_tasks_fastest_im', 69 'await_tasks_fastest_successful_explicit', 70 'await_tasks_fastest_successful_implicit', 71 'await_tasks_fastest_successful', 72 'await_tasks_fastest_successful_im', 73 'wait_tasks_fastest_successful_explicit', 74 'wait_tasks_fastest_successful_implicit', 75 'wait_tasks_fastest_successful', 76 'wait_tasks_fastest_successful_im', 77 'await_tasks_fastest_exception_explicit', 78 'await_tasks_fastest_exception_implicit', 79 'await_tasks_fastest_exception', 80 'await_tasks_fastest_exception_im', 81 'wait_tasks_fastest_exception_explicit', 82 'wait_tasks_fastest_exception_implicit', 83 'wait_tasks_fastest_exception', 84 'wait_tasks_fastest_exception_im', 85 'await_tasks_fastest_custom_explicit', 86 'await_tasks_fastest_custom_implicit', 87 'await_tasks_fastest_custom', 88 'await_tasks_fastest_custom_im', 89 'wait_tasks_fastest_custom_explicit', 90 'wait_tasks_fastest_custom_implicit', 91 'wait_tasks_fastest_custom', 92 'wait_tasks_fastest_custom_im', 93 'await_tasks_atomic_explicit', 94 'await_tasks_atomic_implicit', 95 'await_tasks_atomic', 96 'await_tasks_atomic_im', 97 'wait_tasks_atomic_explicit', 98 'wait_tasks_atomic_implicit', 99 'wait_tasks_atomic', 100 'wait_tasks_atomic_im', 101 'await_tasks_atomic_custom_explicit', 102 'await_tasks_atomic_custom_implicit', 103 'await_tasks_atomic_custom', 104 'await_tasks_atomic_custom_im', 105 'wait_tasks_atomic_custom_explicit', 106 'wait_tasks_atomic_custom_implicit', 107 'wait_tasks_atomic_custom', 108 'wait_tasks_atomic_custom_im', 109 'wait_task_explicit', 110 'wait_task_implicit', 111 'wait_task', 112 'wait_task_im', 113 'await_task_explicit', 114 'await_task_implicit', 115 'await_task', 116 'await_task_im', 117] 118 119 120""" 121Module Docstring 122Docstrings: http://www.python.org/dev/peps/pep-0257/ 123""" 124 125__author__ = "ButenkoMS <gtalk@butenkoms.space>" 126__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 127__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 128__license__ = "Apache License, Version 2.0" 129__version__ = "4.4.1" 130__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 131__email__ = "gtalk@butenkoms.space" 132# __status__ = "Prototype" 133__status__ = "Development" 134# __status__ = "Production" 135 136 137from cengal.parallel_execution.coroutines.coro_scheduler import Interface, current_interface, CoroID, get_interface_and_loop_with_explicit_loop 138from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import Task 139from cengal.parallel_execution.coroutines.coro_standard_services.put_coro_list import PutCoroList, PSCP 140from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import RunCoro 141from cengal.parallel_execution.coroutines.coro_standard_services.kill_coro_list import KillCoroList, KillSingleCoroParams 142from cengal.parallel_execution.coroutines.coro_standard_services.wait_coro import TimeoutError, SubTimeoutError 143from cengal.parallel_execution.coroutines.coro_standard_services.timer_func_runner import timer_func_run_on 144from cengal.parallel_execution.coroutines.coro_tools.coro_flow_control import agraceful_coro_destroyer 145from cengal.parallel_execution.coroutines.coro_standard_services.async_event_bus import AsyncEventBusRequest, AsyncEventBus 146from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import CoroPriority 147from cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request import put_request_to_service 148 149from uuid import uuid4 150from enum import IntEnum 151from typing import Optional, List, Union, Any, Type, Tuple, Dict, Callable 152 153 154class WaitType(IntEnum): 155 normal = 0 156 fastest = 1 157 fastest_successful = 2 158 fastest_exception = 3 159 fastest_custom = 4 160 atomic = 5 161 atomic_custom = 5 162 163 164class TasksListIsEmptyError(Exception): 165 pass 166 167 168class IsNotDoneYetError(Exception): 169 pass 170 171 172class NormalOnDoneHandler: 173 def __init__(self, normal_info: 'NormalInfo') -> None: 174 self.normal_info: 'NormalInfo' = normal_info 175 176 def __call__(self, task: Task) -> Any: 177 self.normal_info.add_result(task.coro_id, task._result, task._exception) 178 179 180class NormalInfo: 181 def __init__(self, wait_done_id: str, tasks: List[Task]) -> None: 182 self.tasks: List[Task] = tasks 183 self.coroutines_num: int = len(tasks) 184 self.results: List[Tuple[CoroID, Any, Exception]] = list() 185 self.wait_done_id: str = wait_done_id 186 self.ignore: bool = False 187 self._done: bool = False 188 self._timeout: bool = False 189 self._failure: bool = False 190 191 def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None: 192 if self.ignore: 193 return 194 195 self.results.append((coro_id, result, exception)) 196 if len(self.results) >= self.coroutines_num: 197 self.done = True 198 199 @property 200 def failure(self) -> bool: 201 return self._failure 202 203 @failure.setter 204 def failure(self, value: bool) -> None: 205 self._failure = value 206 207 @property 208 def done(self) -> bool: 209 return self._done 210 211 @done.setter 212 def done(self, value: bool) -> None: 213 if self._done: 214 return 215 216 self._done = value 217 if value: 218 self.ignore = True 219 put_request_to_service(AsyncEventBus, self.wait_done_id, None, CoroPriority.high) 220 221 @property 222 def timeout(self) -> bool: 223 return self._timeout 224 225 def on_timeout(self) -> None: 226 self._timeout = True 227 self.done = True 228 229 def gather(self) -> List[Tuple[CoroID, Any, Exception]]: 230 if not self.done: 231 raise IsNotDoneYetError 232 233 return self.results 234 235 236# ================================================================================= 237 238 239class FastestOnDoneHandler: 240 def __init__(self, fastest_info: 'FastestInfo') -> None: 241 self.fastest_info: 'FastestInfo' = fastest_info 242 243 def __call__(self, task: Task) -> Any: 244 self.fastest_info.add_result((task.coro_id, task._result, task._exception)) 245 246 247class FastestInfo(NormalInfo): 248 def __init__(self, wait_done_id: str, tasks: List[Task]) -> None: 249 super().__init__(wait_done_id, tasks) 250 self.coroutines_num: int = 1 251 252 253# ================================================================================= 254 255 256class FastestSuccessfulOnDoneHandler: 257 def __init__(self, fastest_successful_info: 'FastestSuccessfulInfo') -> None: 258 self.fastest_successful_info: 'FastestSuccessfulInfo' = fastest_successful_info 259 260 def __call__(self, task: Task) -> Any: 261 self.fastest_successful_info.add_result((task.coro_id, task._result, task._exception)) 262 263 264class FastestSuccessfulInfo(NormalInfo): 265 def __init__(self, wait_done_id: str, tasks: List[Task]) -> None: 266 super().__init__(wait_done_id, tasks) 267 self.fastest_successful_result: Optional[Tuple[CoroID, Any, Exception]] = None 268 269 def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None: 270 if self.ignore: 271 return 272 273 self.results.append((coro_id, result, exception)) 274 if exception is None: 275 self.fastest_successful_result = (coro_id, result, exception) 276 self.done = True 277 elif (len(self.results) >= self.coroutines_num): 278 self.done = True 279 280 def gather(self) -> List[Tuple[CoroID, Any, Exception]]: 281 if not self.done: 282 raise IsNotDoneYetError 283 284 if self.fastest_successful_result is None: 285 self.failure = True 286 287 return [self.fastest_successful_result] 288 289 290# ================================================================================= 291 292 293class FastestExceptionOnDoneHandler: 294 def __init__(self, fastest_exception_info: 'FastestExceptionInfo') -> None: 295 self.fastest_exception_info: 'FastestExceptionInfo' = fastest_exception_info 296 297 def __call__(self, task: Task) -> Any: 298 self.fastest_exception_info.add_result((task.coro_id, task._result, task._exception)) 299 300 301class FastestExceptionInfo(NormalInfo): 302 def __init__(self, wait_done_id: str, tasks: List[Task]) -> None: 303 super().__init__(wait_done_id, tasks) 304 self.fastest_exception_result: Optional[Tuple[CoroID, Any, Exception]] = None 305 306 def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None: 307 if self.ignore: 308 return 309 310 self.results.append((coro_id, result, exception)) 311 if exception is not None: 312 self.fastest_exception_result = (coro_id, result, exception) 313 self.done = True 314 elif (len(self.results) >= self.coroutines_num): 315 self.done = True 316 317 def gather(self) -> List[Tuple[CoroID, Any, Exception]]: 318 if not self.done: 319 raise IsNotDoneYetError 320 321 if self.fastest_exception_result is None: 322 self.failure = True 323 324 return [self.fastest_exception_result] 325 326 327# ================================================================================= 328 329 330class FastestCustomOnDoneHandler: 331 def __init__(self, fastest_custom_info: 'FastestCustomInfo') -> None: 332 self.fastest_custom_info: 'FastestCustomInfo' = fastest_custom_info 333 334 def __call__(self, task: Task) -> Any: 335 self.fastest_custom_info.add_result((task.coro_id, task._result, task._exception)) 336 337 338class FastestCustomInfo(NormalInfo): 339 def __init__(self, wait_done_id: str, tasks: List[Task], result_criteria_handler: Callable[[CoroID, Any, Exception], bool]) -> None: 340 super().__init__(wait_done_id, tasks) 341 self.result_criteria_handler: Callable[[CoroID, Any, Exception], bool] = result_criteria_handler 342 self.fastest_custom_result: Optional[Tuple[CoroID, Any, Exception]] = None 343 344 def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None: 345 if self.ignore: 346 return 347 348 self.results.append((coro_id, result, exception)) 349 if self.result_criteria_handler(coro_id, result, exception): 350 self.fastest_custom_result = (coro_id, result, exception) 351 self.done = True 352 elif (len(self.results) >= self.coroutines_num): 353 self.done = True 354 355 def gather(self) -> List[Tuple[CoroID, Any, Exception]]: 356 if not self.done: 357 raise IsNotDoneYetError 358 359 if self.fastest_custom_result is None: 360 self.failure = True 361 362 return [self.fastest_custom_result] 363 364 365# ================================================================================= 366 367 368class AtomicOnDoneHandler: 369 def __init__(self, atomic_info: 'AtomicInfo') -> None: 370 self.atomic_info: 'AtomicInfo' = atomic_info 371 372 def __call__(self, task: Task) -> Any: 373 self.atomic_info.add_result(task.coro_id, task._result, task._exception) 374 375 376class AtomicInfo(NormalInfo): 377 def __init__(self, wait_done_id: str, tasks: List[Task], include_first_failure: bool = True) -> None: 378 super().__init__(wait_done_id, tasks) 379 self.include_first_failure: bool = include_first_failure 380 381 def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None: 382 if self.ignore: 383 return 384 385 if exception is not None: 386 if self.include_first_failure: 387 self.results.append((coro_id, result, exception)) 388 389 self.failure = True 390 self.done = True 391 else: 392 self.results.append((coro_id, result, exception)) 393 394 if len(self.results) >= self.coroutines_num: 395 self.done = True 396 397 def gather(self) -> List[Tuple[CoroID, Any, Exception]]: 398 if not self.done: 399 raise IsNotDoneYetError 400 401 normalized_results: List[Tuple[CoroID, Any, Exception]] = list() 402 provided_results: Dict[CoroID, Tuple[Any, Exception]] = dict() 403 for result_info in self.results: 404 if result_info is None: 405 continue 406 407 coro_id, result, exception = result_info 408 provided_results[coro_id] = (result, exception) 409 410 for task in self.tasks: 411 coro_id = task.coro_id 412 if coro_id in provided_results: 413 result, exception = provided_results[coro_id] 414 normalized_results.append((coro_id, result, exception)) 415 else: 416 normalized_results.append(None) 417 418 return normalized_results 419 420 421# ================================================================================= 422 423 424class AtomicCustomOnDoneHandler: 425 def __init__(self, atomic_custom_info: 'AtomicCustomInfo') -> None: 426 self.atomic_custom_info: 'AtomicCustomInfo' = atomic_custom_info 427 428 def __call__(self, task: Task) -> Any: 429 self.atomic_custom_info.add_result(task.coro_id, task._result, task._exception) 430 431 432def success_result_criteria_handler(coro_id: CoroID, result: Any, exception: Exception) -> bool: 433 return exception is None 434 435 436def exception_result_criteria_handler(coro_id: CoroID, result: Any, exception: Exception) -> bool: 437 return exception is not None 438 439 440def same_first_failed_result_formatter(coro_id: CoroID, result: Any, exception: Exception) -> Any: 441 return (coro_id, result, exception) 442 443 444def none_first_failed_result_formatter(coro_id: CoroID, result: Any, exception: Exception) -> Any: 445 return None 446 447 448class AtomicCustomInfo(NormalInfo): 449 def __init__( 450 self, 451 wait_done_id: str, 452 tasks: List[Task], 453 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 454 first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 455 ) -> None: 456 super().__init__(wait_done_id, tasks) 457 self.result_criteria_handler: Callable[[CoroID, Any, Exception], bool] = result_criteria_handler 458 self.first_failed_result_formatter: Callable[[CoroID, Any, Exception], Any] = \ 459 none_first_failed_result_formatter if first_failed_result_formatter is None else first_failed_result_formatter 460 461 def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None: 462 if self.ignore: 463 return 464 465 self.results.append((coro_id, result, exception)) 466 if (len(self.results) >= self.coroutines_num) or (exception is not None): 467 self.done = True 468 469 if not self.result_criteria_handler(coro_id, result, exception): 470 result = self.first_failed_result_formatter(coro_id, result, exception) 471 self.failure = True 472 self.done = True 473 else: 474 result = (coro_id, result, exception) 475 476 self.results.append(result) 477 if len(self.results) >= self.coroutines_num: 478 self.done = True 479 480 def gather(self) -> List[Tuple[CoroID, Any, Exception]]: 481 if not self.done: 482 raise IsNotDoneYetError 483 484 normalized_results: List[Tuple[CoroID, Any, Exception]] = list() 485 provided_results: Dict[CoroID, Tuple[Any, Exception]] = dict() 486 for result_info in self.results: 487 if result_info is None: 488 continue 489 490 coro_id, result, exception = result_info 491 provided_results[coro_id] = (result, exception) 492 493 for task in self.tasks: 494 coro_id = task.coro_id 495 if coro_id in provided_results: 496 result, exception = provided_results[coro_id] 497 normalized_results.append((coro_id, result, exception)) 498 else: 499 normalized_results.append(None) 500 501 return normalized_results 502 503 504# ================================================================================= 505 506 507class GracefulTerminationSettings: 508 def __init__(self, 509 phase_time_limit: Optional[float], 510 ex_type: Type[Exception] = None, ex_value: Exception = None, ex_traceback: Any = None, 511 first_phase_is_wait: bool = True, 512 last_phase_is_kill: bool = True, 513 ) -> None: 514 self.phase_time_limit: Optional[float] = phase_time_limit 515 self.ex_type: Type[Exception] = ex_type 516 self.ex_value: Exception = ex_value 517 self.ex_traceback: Any = ex_traceback 518 self.first_phase_is_wait: bool = first_phase_is_wait 519 self.last_phase_is_kill: bool = last_phase_is_kill 520 521 522class TerminationReason(IntEnum): 523 success = 0 524 failure = 1 525 timeout = 2 526 527 528async def atask_graceful_destroyer( 529 i: Interface, 530 task: Task, 531 tree: bool = False, 532 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 533 ) -> None: 534 await agraceful_coro_destroyer( 535 i, 536 graceful_termination_settings.phase_time_limit, 537 task.coro_id, 538 graceful_termination_settings.ex_type, 539 graceful_termination_settings.ex_value, 540 graceful_termination_settings.ex_traceback, 541 tree, 542 graceful_termination_settings.first_phase_is_wait, 543 graceful_termination_settings.last_phase_is_kill, 544 ) 545 546 547async def aterminate_tasks_explicit( 548 i: Interface, 549 tasks: List[Task], 550 termination_reason: TerminationReason, 551 tree: bool = False, 552 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 553 terminate_other_tasks: Optional[bool] = None, 554 terminate_other_tasks_on_success: bool = False, 555 terminate_other_tasks_on_failure: bool = False, 556 terminate_other_tasks_on_timeout: bool = False, 557 wait_for_termination: bool = False, 558 ) -> bool: 559 if not tasks: 560 return False 561 562 if terminate_other_tasks is not None: 563 terminate_other_tasks_on_success = terminate_other_tasks 564 terminate_other_tasks_on_failure = terminate_other_tasks 565 terminate_other_tasks_on_timeout = terminate_other_tasks 566 567 if ((TerminationReason.success == termination_reason) and terminate_other_tasks_on_success) or \ 568 ((TerminationReason.failure == termination_reason) and terminate_other_tasks_on_failure) or \ 569 ((TerminationReason.timeout == termination_reason) and terminate_other_tasks_on_timeout): 570 if graceful_termination_settings is None: 571 termination_tasks_params: List[KillSingleCoroParams] = list([KillSingleCoroParams(task.coro_id, tree) for task in tasks if not task.done]) 572 if not termination_tasks_params: 573 return False 574 575 i(KillCoroList, termination_tasks) 576 else: 577 termination_tasks_params: List[PSCP] = [PSCP(atask_graceful_destroyer, task, tree, graceful_termination_settings) for task in tasks if not task.done] 578 if not termination_tasks_params: 579 return False 580 581 termination_tasks: List[Task] = await i(PutCoroList, termination_tasks_params, True) 582 if wait_for_termination: 583 await await_tasks_explicit(i, termination_tasks, False, None, None, False, False, False, False) 584 585 return True 586 587 588async def aterminate_tasks_implicit( 589 tasks: List[Task], 590 termination_reason: TerminationReason, 591 tree: bool = False, 592 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 593 terminate_other_tasks: Optional[bool] = None, 594 terminate_other_tasks_on_success: bool = False, 595 terminate_other_tasks_on_failure: bool = False, 596 terminate_other_tasks_on_timeout: bool = False, 597 wait_for_termination: bool = False, 598 ) -> bool: 599 i: Interface = current_interface() 600 return await aterminate_tasks_explicit( 601 i, 602 tasks, 603 termination_reason, 604 tree, 605 graceful_termination_settings, 606 terminate_other_tasks, 607 terminate_other_tasks_on_success, 608 terminate_other_tasks_on_failure, 609 terminate_other_tasks_on_timeout, 610 wait_for_termination, 611 ) 612 613 614aterminate_tasks: Callable = aterminate_tasks_explicit 615aterminate_tasks_im: Callable = aterminate_tasks_implicit 616 617 618def terminate_tasks_explicit( 619 i: Interface, 620 tasks: List[Task], 621 termination_reason: TerminationReason, 622 tree: bool = False, 623 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 624 terminate_other_tasks: Optional[bool] = None, 625 terminate_other_tasks_on_success: bool = False, 626 terminate_other_tasks_on_failure: bool = False, 627 terminate_other_tasks_on_timeout: bool = False, 628 wait_for_termination: bool = False, 629 ) -> bool: 630 i(RunCoro, aterminate_tasks_explicit, 631 tasks, 632 termination_reason, 633 tree, 634 graceful_termination_settings, 635 terminate_other_tasks, 636 terminate_other_tasks_on_success, 637 terminate_other_tasks_on_failure, 638 terminate_other_tasks_on_timeout, 639 wait_for_termination, 640 ) 641 642 643def terminate_tasks_implicit( 644 tasks: List[Task], 645 termination_reason: TerminationReason, 646 tree: bool = False, 647 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 648 terminate_other_tasks: Optional[bool] = None, 649 terminate_other_tasks_on_success: bool = False, 650 terminate_other_tasks_on_failure: bool = False, 651 terminate_other_tasks_on_timeout: bool = False, 652 wait_for_termination: bool = False, 653 ) -> bool: 654 i: Interface = current_interface() 655 return terminate_tasks_explicit( 656 i, 657 tasks, 658 termination_reason, 659 tree, 660 graceful_termination_settings, 661 terminate_other_tasks, 662 terminate_other_tasks_on_success, 663 terminate_other_tasks_on_failure, 664 terminate_other_tasks_on_timeout, 665 wait_for_termination, 666 ) 667 668 669terminate_tasks: Callable = terminate_tasks_explicit 670terminate_tasks_im: Callable = terminate_tasks_implicit 671 672 673# ================================================================================= 674 675 676async def await_tasks_explicit( 677 i: Interface, 678 tasks: Union[Task, List[Task]], 679 terminate_tree: bool = False, 680 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 681 timeout: Optional[float] = None, 682 terminate_other_tasks: Optional[bool] = None, 683 terminate_other_tasks_on_success: bool = False, 684 terminate_other_tasks_on_failure: bool = False, 685 terminate_other_tasks_on_timeout: bool = False, 686 wait_for_termination: bool = False, 687 ) -> List[Tuple[CoroID, Any, Exception]]: 688 if isinstance(tasks, Task): 689 tasks = [tasks] 690 691 if not tasks: 692 raise TasksListIsEmptyError 693 694 normal_wait_id: str = str(f'wait_task__normal__{uuid4()}') 695 normal_info: NormalInfo = NormalInfo(normal_wait_id, tasks) 696 697 if timeout is not None: 698 # TODO: implement discard timer request when needed in order to improve performance 699 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, normal_info.on_timeout) 700 701 for another_task in tasks: 702 another_task.add_on_done_handler(NormalOnDoneHandler(normal_info)) 703 704 await i(AsyncEventBus, AsyncEventBusRequest().wait(normal_wait_id)) 705 results: List[Tuple[CoroID, Any, Exception]] = normal_info.gather() 706 if normal_info.timeout: 707 termination_reason: TerminationReason = TerminationReason.timeout 708 elif normal_info.failure: 709 termination_reason = TerminationReason.failure 710 else: 711 termination_reason = TerminationReason.success 712 713 await terminate_tasks_explicit( 714 i, 715 tasks, 716 termination_reason, 717 terminate_tree, 718 graceful_termination_settings, 719 terminate_other_tasks, 720 terminate_other_tasks_on_success, 721 terminate_other_tasks_on_failure, 722 terminate_other_tasks_on_timeout, 723 wait_for_termination, 724 ) 725 return results 726 727 728async def await_tasks_implicit( 729 tasks: Union[Task, List[Task]], 730 terminate_tree: bool = False, 731 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 732 timeout: Optional[float] = None, 733 terminate_other_tasks: Optional[bool] = None, 734 terminate_other_tasks_on_success: bool = False, 735 terminate_other_tasks_on_failure: bool = False, 736 terminate_other_tasks_on_timeout: bool = False, 737 wait_for_termination: bool = False, 738 ) -> List[Tuple[CoroID, Any, Exception]]: 739 i: Interface = current_interface() 740 return await await_tasks_explicit( 741 i, 742 tasks, 743 terminate_tree, 744 graceful_termination_settings, 745 timeout, 746 terminate_other_tasks, 747 terminate_other_tasks_on_success, 748 terminate_other_tasks_on_failure, 749 terminate_other_tasks_on_timeout, 750 wait_for_termination, 751 ) 752 753 754await_tasks: Callable = await_tasks_explicit 755await_tasks_im: Callable = await_tasks_implicit 756 757 758def wait_tasks_explicit( 759 i: Interface, 760 tasks: Union[Task, List[Task]], 761 terminate_tree: bool = False, 762 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 763 timeout: Optional[float] = None, 764 terminate_other_tasks: Optional[bool] = None, 765 terminate_other_tasks_on_success: bool = False, 766 terminate_other_tasks_on_failure: bool = False, 767 terminate_other_tasks_on_timeout: bool = False, 768 wait_for_termination: bool = False, 769 ) -> List[Tuple[CoroID, Any, Exception]]: 770 i(RunCoro, await_tasks_explicit, 771 tasks, 772 terminate_tree, 773 graceful_termination_settings, 774 timeout, 775 terminate_other_tasks, 776 terminate_other_tasks_on_success, 777 terminate_other_tasks_on_failure, 778 terminate_other_tasks_on_timeout, 779 wait_for_termination, 780 ) 781 782 783def wait_tasks_implicit( 784 tasks: Union[Task, List[Task]], 785 terminate_tree: bool = False, 786 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 787 timeout: Optional[float] = None, 788 terminate_other_tasks: Optional[bool] = None, 789 terminate_other_tasks_on_success: bool = False, 790 terminate_other_tasks_on_failure: bool = False, 791 terminate_other_tasks_on_timeout: bool = False, 792 wait_for_termination: bool = False, 793 ) -> List[Tuple[CoroID, Any, Exception]]: 794 i: Interface = current_interface() 795 return wait_tasks_explicit( 796 i, 797 tasks, 798 terminate_tree, 799 graceful_termination_settings, 800 timeout, 801 terminate_other_tasks, 802 terminate_other_tasks_on_success, 803 terminate_other_tasks_on_failure, 804 terminate_other_tasks_on_timeout, 805 wait_for_termination, 806 ) 807 808 809wait_tasks: Callable = wait_tasks_explicit 810wait_tasks_im: Callable = wait_tasks_implicit 811 812 813# ================================================================================= 814 815 816async def await_tasks_fastest_explicit( 817 i: Interface, 818 tasks: Union[Task, List[Task]], 819 terminate_tree: bool = False, 820 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 821 timeout: Optional[float] = None, 822 terminate_other_tasks: Optional[bool] = None, 823 terminate_other_tasks_on_success: bool = False, 824 terminate_other_tasks_on_failure: bool = False, 825 terminate_other_tasks_on_timeout: bool = False, 826 wait_for_termination: bool = False, 827 ) -> Tuple[CoroID, Any]: 828 if isinstance(tasks, Task): 829 tasks = [tasks] 830 831 if not tasks: 832 raise TasksListIsEmptyError 833 834 fastest_wait_id: str = str(f'wait_task__fastest__{uuid4()}') 835 fastest_info: FastestInfo = FastestInfo(fastest_wait_id, tasks) 836 837 if timeout is not None: 838 # TODO: implement discard timer request when needed in order to improve performance 839 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_info.on_timeout) 840 841 for another_task in tasks: 842 another_task.add_on_done_handler(FastestOnDoneHandler(fastest_info)) 843 844 await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_wait_id)) 845 results: List[Tuple[CoroID, Any, Exception]] = fastest_info.gather() 846 if fastest_info.timeout: 847 termination_reason: TerminationReason = TerminationReason.timeout 848 elif fastest_info.failure: 849 termination_reason = TerminationReason.failure 850 else: 851 termination_reason = TerminationReason.success 852 853 await aterminate_tasks_explicit( 854 i, 855 tasks, 856 termination_reason, 857 terminate_tree, 858 graceful_termination_settings, 859 terminate_other_tasks, 860 terminate_other_tasks_on_success, 861 terminate_other_tasks_on_failure, 862 terminate_other_tasks_on_timeout, 863 wait_for_termination, 864 ) 865 if not results: 866 return None, None 867 868 coro_id, result, exception = results[0] 869 if exception is not None: 870 if isinstance(exception, TimeoutError) and (coro_id is not None): 871 exception = SubTimeoutError() 872 873 raise exception 874 875 return coro_id, result 876 877 878async def await_tasks_fastest_implicit( 879 tasks: Union[Task, List[Task]], 880 terminate_tree: bool = False, 881 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 882 timeout: Optional[float] = None, 883 terminate_other_tasks: Optional[bool] = None, 884 terminate_other_tasks_on_success: bool = False, 885 terminate_other_tasks_on_failure: bool = False, 886 terminate_other_tasks_on_timeout: bool = False, 887 wait_for_termination: bool = False, 888 ) -> List[Tuple[CoroID, Any, Exception]]: 889 i: Interface = current_interface() 890 return await await_tasks_fastest_explicit( 891 i, 892 tasks, 893 terminate_tree, 894 graceful_termination_settings, 895 timeout, 896 terminate_other_tasks, 897 terminate_other_tasks_on_success, 898 terminate_other_tasks_on_failure, 899 terminate_other_tasks_on_timeout, 900 wait_for_termination, 901 ) 902 903 904await_tasks_fastest: Callable = await_tasks_fastest_explicit 905await_tasks_fastest_im: Callable = await_tasks_fastest_implicit 906 907 908def wait_tasks_fastest_explicit( 909 i: Interface, 910 tasks: Union[Task, List[Task]], 911 terminate_tree: bool = False, 912 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 913 timeout: Optional[float] = None, 914 terminate_other_tasks: Optional[bool] = None, 915 terminate_other_tasks_on_success: bool = False, 916 terminate_other_tasks_on_failure: bool = False, 917 terminate_other_tasks_on_timeout: bool = False, 918 wait_for_termination: bool = False, 919 ) -> List[Tuple[CoroID, Any, Exception]]: 920 i(RunCoro, await_tasks_fastest_explicit, 921 tasks, 922 terminate_tree, 923 graceful_termination_settings, 924 timeout, 925 terminate_other_tasks, 926 terminate_other_tasks_on_success, 927 terminate_other_tasks_on_failure, 928 terminate_other_tasks_on_timeout, 929 wait_for_termination, 930 ) 931 932 933def wait_tasks_fastest_implicit( 934 tasks: Union[Task, List[Task]], 935 terminate_tree: bool = False, 936 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 937 timeout: Optional[float] = None, 938 terminate_other_tasks: Optional[bool] = None, 939 terminate_other_tasks_on_success: bool = False, 940 terminate_other_tasks_on_failure: bool = False, 941 terminate_other_tasks_on_timeout: bool = False, 942 wait_for_termination: bool = False, 943 ) -> List[Tuple[CoroID, Any, Exception]]: 944 i: Interface = current_interface() 945 return wait_tasks_fastest_explicit( 946 i, 947 tasks, 948 terminate_tree, 949 graceful_termination_settings, 950 timeout, 951 terminate_other_tasks, 952 terminate_other_tasks_on_success, 953 terminate_other_tasks_on_failure, 954 terminate_other_tasks_on_timeout, 955 wait_for_termination, 956 ) 957 958 959wait_tasks_fastest: Callable = wait_tasks_fastest_explicit 960wait_tasks_fastest_im: Callable = wait_tasks_fastest_implicit 961 962 963# ================================================================================= 964 965 966async def await_tasks_fastest_successful_explicit( 967 i: Interface, 968 tasks: Union[Task, List[Task]], 969 terminate_tree: bool = False, 970 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 971 timeout: Optional[float] = None, 972 terminate_other_tasks: Optional[bool] = None, 973 terminate_other_tasks_on_success: bool = False, 974 terminate_other_tasks_on_failure: bool = False, 975 terminate_other_tasks_on_timeout: bool = False, 976 wait_for_termination: bool = False, 977 ) -> Tuple[CoroID, Any]: 978 if isinstance(tasks, Task): 979 tasks = [tasks] 980 981 if not tasks: 982 raise TasksListIsEmptyError 983 984 fastest_successful_wait_id: str = str(f'wait_task__fastest_successful__{uuid4()}') 985 fastest_successful_info: FastestSuccessfulInfo = FastestSuccessfulInfo(fastest_successful_wait_id, tasks) 986 987 if timeout is not None: 988 # TODO: implement discard timer request when needed in order to improve performance 989 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_successful_info.on_timeout) 990 991 for another_task in tasks: 992 another_task.add_on_done_handler(FastestSuccessfulOnDoneHandler(fastest_successful_info)) 993 994 await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_successful_wait_id)) 995 results: List[Tuple[CoroID, Any, Exception]] = fastest_successful_info.gather() 996 if fastest_successful_info.timeout: 997 termination_reason: TerminationReason = TerminationReason.timeout 998 elif fastest_successful_info.failure: 999 termination_reason = TerminationReason.failure 1000 else: 1001 termination_reason = TerminationReason.success 1002 1003 await aterminate_tasks_explicit( 1004 i, 1005 tasks, 1006 termination_reason, 1007 terminate_tree, 1008 graceful_termination_settings, 1009 terminate_other_tasks, 1010 terminate_other_tasks_on_success, 1011 terminate_other_tasks_on_failure, 1012 terminate_other_tasks_on_timeout, 1013 wait_for_termination, 1014 ) 1015 if not results: 1016 return None, None 1017 1018 coro_id, result, exception = results[0] 1019 if exception is not None: 1020 if isinstance(exception, TimeoutError) and (coro_id is not None): 1021 exception = SubTimeoutError() 1022 1023 raise exception 1024 1025 return coro_id, result 1026 1027 1028async def await_tasks_fastest_successful_implicit( 1029 tasks: Union[Task, List[Task]], 1030 terminate_tree: bool = False, 1031 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1032 timeout: Optional[float] = None, 1033 terminate_other_tasks: Optional[bool] = None, 1034 terminate_other_tasks_on_success: bool = False, 1035 terminate_other_tasks_on_failure: bool = False, 1036 terminate_other_tasks_on_timeout: bool = False, 1037 wait_for_termination: bool = False, 1038 ) -> List[Tuple[CoroID, Any, Exception]]: 1039 i: Interface = current_interface() 1040 return await await_tasks_fastest_successful_explicit( 1041 i, 1042 tasks, 1043 terminate_tree, 1044 graceful_termination_settings, 1045 timeout, 1046 terminate_other_tasks, 1047 terminate_other_tasks_on_success, 1048 terminate_other_tasks_on_failure, 1049 terminate_other_tasks_on_timeout, 1050 wait_for_termination, 1051 ) 1052 1053 1054await_tasks_fastest_successful: Callable = await_tasks_fastest_successful_explicit 1055await_tasks_fastest_successful_im: Callable = await_tasks_fastest_successful_implicit 1056 1057 1058def wait_tasks_fastest_successful_explicit( 1059 i: Interface, 1060 tasks: Union[Task, List[Task]], 1061 terminate_tree: bool = False, 1062 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1063 timeout: Optional[float] = None, 1064 terminate_other_tasks: Optional[bool] = None, 1065 terminate_other_tasks_on_success: bool = False, 1066 terminate_other_tasks_on_failure: bool = False, 1067 terminate_other_tasks_on_timeout: bool = False, 1068 wait_for_termination: bool = False, 1069 ) -> List[Tuple[CoroID, Any, Exception]]: 1070 i(RunCoro, await_tasks_explicit, 1071 tasks, 1072 terminate_tree, 1073 graceful_termination_settings, 1074 timeout, 1075 terminate_other_tasks, 1076 terminate_other_tasks_on_success, 1077 terminate_other_tasks_on_failure, 1078 terminate_other_tasks_on_timeout, 1079 wait_for_termination, 1080 ) 1081 1082 1083def wait_tasks_fastest_successful_implicit( 1084 tasks: Union[Task, List[Task]], 1085 terminate_tree: bool = False, 1086 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1087 timeout: Optional[float] = None, 1088 terminate_other_tasks: Optional[bool] = None, 1089 terminate_other_tasks_on_success: bool = False, 1090 terminate_other_tasks_on_failure: bool = False, 1091 terminate_other_tasks_on_timeout: bool = False, 1092 wait_for_termination: bool = False, 1093 ) -> List[Tuple[CoroID, Any, Exception]]: 1094 i: Interface = current_interface() 1095 return wait_tasks_fastest_successful_explicit( 1096 i, 1097 tasks, 1098 terminate_tree, 1099 graceful_termination_settings, 1100 timeout, 1101 terminate_other_tasks, 1102 terminate_other_tasks_on_success, 1103 terminate_other_tasks_on_failure, 1104 terminate_other_tasks_on_timeout, 1105 wait_for_termination, 1106 ) 1107 1108 1109wait_tasks_fastest_successful: Callable = wait_tasks_fastest_successful_explicit 1110wait_tasks_fastest_successful_im: Callable = wait_tasks_fastest_successful_implicit 1111 1112 1113# ================================================================================= 1114 1115 1116async def await_tasks_fastest_exception_explicit( 1117 i: Interface, 1118 tasks: Union[Task, List[Task]], 1119 terminate_tree: bool = False, 1120 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1121 timeout: Optional[float] = None, 1122 terminate_other_tasks: Optional[bool] = None, 1123 terminate_other_tasks_on_success: bool = False, 1124 terminate_other_tasks_on_failure: bool = False, 1125 terminate_other_tasks_on_timeout: bool = False, 1126 wait_for_termination: bool = False, 1127 ) -> Tuple[CoroID, Exception]: 1128 if isinstance(tasks, Task): 1129 tasks = [tasks] 1130 1131 if not tasks: 1132 raise TasksListIsEmptyError 1133 1134 fastest_exception_wait_id: str = str(f'wait_task__fastest_exception__{uuid4()}') 1135 fastest_exception_info: FastestExceptionInfo = FastestExceptionInfo(fastest_exception_wait_id, tasks) 1136 1137 if timeout is not None: 1138 # TODO: implement discard timer request when needed in order to improve performance 1139 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_exception_info.on_timeout) 1140 1141 for another_task in tasks: 1142 another_task.add_on_done_handler(FastestExceptionOnDoneHandler(fastest_exception_info)) 1143 1144 await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_exception_wait_id)) 1145 results: List[Tuple[CoroID, Any, Exception]] = fastest_exception_info.gather() 1146 if fastest_exception_info.timeout: 1147 termination_reason: TerminationReason = TerminationReason.timeout 1148 elif fastest_exception_info.failure: 1149 termination_reason = TerminationReason.failure 1150 else: 1151 termination_reason = TerminationReason.success 1152 1153 await aterminate_tasks_explicit( 1154 i, 1155 tasks, 1156 termination_reason, 1157 terminate_tree, 1158 graceful_termination_settings, 1159 terminate_other_tasks, 1160 terminate_other_tasks_on_success, 1161 terminate_other_tasks_on_failure, 1162 terminate_other_tasks_on_timeout, 1163 wait_for_termination, 1164 ) 1165 if not results: 1166 return None, None 1167 1168 coro_id, result, exception = results[0] 1169 if exception is not None: 1170 if isinstance(exception, TimeoutError) and (coro_id is not None): 1171 exception = SubTimeoutError() 1172 1173 return coro_id, exception 1174 1175 1176async def await_tasks_fastest_exception_implicit( 1177 tasks: Union[Task, List[Task]], 1178 terminate_tree: bool = False, 1179 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1180 timeout: Optional[float] = None, 1181 terminate_other_tasks: Optional[bool] = None, 1182 terminate_other_tasks_on_success: bool = False, 1183 terminate_other_tasks_on_failure: bool = False, 1184 terminate_other_tasks_on_timeout: bool = False, 1185 wait_for_termination: bool = False, 1186 ) -> List[Tuple[CoroID, Any, Exception]]: 1187 i: Interface = current_interface() 1188 return await await_tasks_explicit( 1189 i, 1190 tasks, 1191 terminate_tree, 1192 graceful_termination_settings, 1193 timeout, 1194 terminate_other_tasks, 1195 terminate_other_tasks_on_success, 1196 terminate_other_tasks_on_failure, 1197 terminate_other_tasks_on_timeout, 1198 wait_for_termination, 1199 ) 1200 1201 1202await_tasks_fastest_exception: Callable = await_tasks_fastest_exception_explicit 1203await_tasks_fastest_exception_im: Callable = await_tasks_fastest_exception_implicit 1204 1205 1206def wait_tasks_fastest_exception_explicit( 1207 i: Interface, 1208 tasks: Union[Task, List[Task]], 1209 terminate_tree: bool = False, 1210 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1211 timeout: Optional[float] = None, 1212 terminate_other_tasks: Optional[bool] = None, 1213 terminate_other_tasks_on_success: bool = False, 1214 terminate_other_tasks_on_failure: bool = False, 1215 terminate_other_tasks_on_timeout: bool = False, 1216 wait_for_termination: bool = False, 1217 ) -> List[Tuple[CoroID, Any, Exception]]: 1218 i(RunCoro, await_tasks_fastest_exception_explicit, 1219 tasks, 1220 terminate_tree, 1221 graceful_termination_settings, 1222 timeout, 1223 terminate_other_tasks, 1224 terminate_other_tasks_on_success, 1225 terminate_other_tasks_on_failure, 1226 terminate_other_tasks_on_timeout, 1227 wait_for_termination, 1228 ) 1229 1230 1231def wait_tasks_fastest_exception_implicit( 1232 tasks: Union[Task, List[Task]], 1233 terminate_tree: bool = False, 1234 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1235 timeout: Optional[float] = None, 1236 terminate_other_tasks: Optional[bool] = None, 1237 terminate_other_tasks_on_success: bool = False, 1238 terminate_other_tasks_on_failure: bool = False, 1239 terminate_other_tasks_on_timeout: bool = False, 1240 wait_for_termination: bool = False, 1241 ) -> List[Tuple[CoroID, Any, Exception]]: 1242 i: Interface = current_interface() 1243 return wait_tasks_fastest_exception_explicit( 1244 i, 1245 tasks, 1246 terminate_tree, 1247 graceful_termination_settings, 1248 timeout, 1249 terminate_other_tasks, 1250 terminate_other_tasks_on_success, 1251 terminate_other_tasks_on_failure, 1252 terminate_other_tasks_on_timeout, 1253 wait_for_termination, 1254 ) 1255 1256 1257wait_tasks_fastest_exception: Callable = wait_tasks_fastest_exception_explicit 1258wait_tasks_fastest_exception_im: Callable = wait_tasks_fastest_exception_implicit 1259 1260 1261# ================================================================================= 1262 1263 1264async def await_tasks_fastest_custom_explicit( 1265 i: Interface, 1266 tasks: Union[Task, List[Task]], 1267 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1268 terminate_tree: bool = False, 1269 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1270 timeout: Optional[float] = None, 1271 terminate_other_tasks: Optional[bool] = None, 1272 terminate_other_tasks_on_success: bool = False, 1273 terminate_other_tasks_on_failure: bool = False, 1274 terminate_other_tasks_on_timeout: bool = False, 1275 wait_for_termination: bool = False, 1276 ) -> Tuple[CoroID, Any]: 1277 if isinstance(tasks, Task): 1278 tasks = [tasks] 1279 1280 if not tasks: 1281 raise TasksListIsEmptyError 1282 1283 fastest_custom_wait_id: str = str(f'wait_task__fastest_custom__{uuid4()}') 1284 fastest_custom_info: FastestCustomInfo = FastestCustomInfo(fastest_custom_wait_id, tasks, result_criteria_handler) 1285 1286 if timeout is not None: 1287 # TODO: implement discard timer request when needed in order to improve performance 1288 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_custom_info.on_timeout) 1289 1290 for another_task in tasks: 1291 another_task.add_on_done_handler(FastestCustomOnDoneHandler(fastest_custom_info)) 1292 1293 await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_custom_wait_id)) 1294 results: List[Tuple[CoroID, Any, Exception]] = fastest_custom_info.gather() 1295 if fastest_custom_info.timeout: 1296 termination_reason: TerminationReason = TerminationReason.timeout 1297 elif fastest_custom_info.failure: 1298 termination_reason = TerminationReason.failure 1299 else: 1300 termination_reason = TerminationReason.success 1301 1302 await aterminate_tasks_explicit( 1303 i, 1304 tasks, 1305 termination_reason, 1306 terminate_tree, 1307 graceful_termination_settings, 1308 terminate_other_tasks, 1309 terminate_other_tasks_on_success, 1310 terminate_other_tasks_on_failure, 1311 terminate_other_tasks_on_timeout, 1312 wait_for_termination, 1313 ) 1314 if not results: 1315 return None, None 1316 1317 coro_id, result, exception = results[0] 1318 if exception is not None: 1319 if isinstance(exception, TimeoutError) and (coro_id is not None): 1320 exception = SubTimeoutError() 1321 1322 raise exception 1323 1324 return coro_id, result 1325 1326 1327async def await_tasks_fastest_custom_implicit( 1328 tasks: Union[Task, List[Task]], 1329 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1330 terminate_tree: bool = False, 1331 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1332 timeout: Optional[float] = None, 1333 terminate_other_tasks: Optional[bool] = None, 1334 terminate_other_tasks_on_success: bool = False, 1335 terminate_other_tasks_on_failure: bool = False, 1336 terminate_other_tasks_on_timeout: bool = False, 1337 wait_for_termination: bool = False, 1338 ) -> List[Tuple[CoroID, Any, Exception]]: 1339 i: Interface = current_interface() 1340 return await await_tasks_fastest_custom_explicit( 1341 i, 1342 tasks, 1343 result_criteria_handler, 1344 terminate_tree, 1345 graceful_termination_settings, 1346 timeout, 1347 terminate_other_tasks, 1348 terminate_other_tasks_on_success, 1349 terminate_other_tasks_on_failure, 1350 terminate_other_tasks_on_timeout, 1351 wait_for_termination, 1352 ) 1353 1354 1355await_tasks_fastest_custom: Callable = await_tasks_fastest_custom_explicit 1356await_tasks_fastest_custom_im: Callable = await_tasks_fastest_custom_implicit 1357 1358 1359def wait_tasks_fastest_custom_explicit( 1360 i: Interface, 1361 tasks: Union[Task, List[Task]], 1362 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1363 terminate_tree: bool = False, 1364 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1365 timeout: Optional[float] = None, 1366 terminate_other_tasks: Optional[bool] = None, 1367 terminate_other_tasks_on_success: bool = False, 1368 terminate_other_tasks_on_failure: bool = False, 1369 terminate_other_tasks_on_timeout: bool = False, 1370 wait_for_termination: bool = False, 1371 ) -> List[Tuple[CoroID, Any, Exception]]: 1372 i(RunCoro, await_tasks_fastest_custom_explicit, 1373 tasks, 1374 result_criteria_handler, 1375 terminate_tree, 1376 graceful_termination_settings, 1377 timeout, 1378 terminate_other_tasks, 1379 terminate_other_tasks_on_success, 1380 terminate_other_tasks_on_failure, 1381 terminate_other_tasks_on_timeout, 1382 wait_for_termination, 1383 ) 1384 1385 1386def wait_tasks_fastest_custom_implicit( 1387 tasks: Union[Task, List[Task]], 1388 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1389 terminate_tree: bool = False, 1390 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1391 timeout: Optional[float] = None, 1392 terminate_other_tasks: Optional[bool] = None, 1393 terminate_other_tasks_on_success: bool = False, 1394 terminate_other_tasks_on_failure: bool = False, 1395 terminate_other_tasks_on_timeout: bool = False, 1396 wait_for_termination: bool = False, 1397 ) -> List[Tuple[CoroID, Any, Exception]]: 1398 i: Interface = current_interface() 1399 return wait_tasks_fastest_custom_explicit( 1400 i, 1401 tasks, 1402 result_criteria_handler, 1403 terminate_tree, 1404 graceful_termination_settings, 1405 timeout, 1406 terminate_other_tasks, 1407 terminate_other_tasks_on_success, 1408 terminate_other_tasks_on_failure, 1409 terminate_other_tasks_on_timeout, 1410 wait_for_termination, 1411 ) 1412 1413 1414wait_tasks_fastest_custom: Callable = wait_tasks_fastest_custom_explicit 1415wait_tasks_fastest_custom_im: Callable = wait_tasks_fastest_custom_implicit 1416 1417 1418# ================================================================================= 1419 1420 1421async def await_tasks_atomic_explicit( 1422 i: Interface, 1423 tasks: Union[Task, List[Task]], 1424 terminate_tree: bool = False, 1425 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1426 timeout: Optional[float] = None, 1427 terminate_other_tasks: Optional[bool] = None, 1428 terminate_other_tasks_on_success: bool = False, 1429 terminate_other_tasks_on_failure: bool = False, 1430 terminate_other_tasks_on_timeout: bool = False, 1431 wait_for_termination: bool = False, 1432 ): 1433 if terminate_other_tasks is not None: 1434 terminate_other_tasks_on_success = terminate_other_tasks 1435 terminate_other_tasks_on_failure = terminate_other_tasks 1436 terminate_other_tasks_on_timeout = terminate_other_tasks 1437 1438 if isinstance(tasks, Task): 1439 tasks = [tasks] 1440 1441 if not tasks: 1442 raise TasksListIsEmptyError 1443 1444 atomic_wait_id: str = str(f'wait_task__atomic__{uuid4()}') 1445 atomic_info: AtomicInfo = AtomicInfo(atomic_wait_id, tasks) 1446 1447 if timeout is not None: 1448 # TODO: implement discard timer request when needed in order to improve performance 1449 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, atomic_info.on_timeout) 1450 1451 for another_task in tasks: 1452 another_task.add_on_done_handler(AtomicOnDoneHandler(atomic_info)) 1453 1454 await i(AsyncEventBus, AsyncEventBusRequest().wait(atomic_wait_id)) 1455 results: List[Tuple[CoroID, Any, Exception]] = atomic_info.gather() 1456 if atomic_info.timeout: 1457 termination_reason: TerminationReason = TerminationReason.timeout 1458 elif atomic_info.failure: 1459 termination_reason = TerminationReason.failure 1460 else: 1461 termination_reason = TerminationReason.success 1462 1463 await aterminate_tasks_explicit( 1464 i, 1465 tasks, 1466 termination_reason, 1467 terminate_tree, 1468 graceful_termination_settings, 1469 terminate_other_tasks, 1470 terminate_other_tasks_on_success, 1471 terminate_other_tasks_on_failure, 1472 terminate_other_tasks_on_timeout, 1473 wait_for_termination, 1474 ) 1475 return results 1476 1477 1478async def await_tasks_atomic_implicit( 1479 tasks: Union[Task, List[Task]], 1480 terminate_tree: bool = False, 1481 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1482 timeout: Optional[float] = None, 1483 terminate_other_tasks: Optional[bool] = None, 1484 terminate_other_tasks_on_success: bool = False, 1485 terminate_other_tasks_on_failure: bool = False, 1486 terminate_other_tasks_on_timeout: bool = False, 1487 wait_for_termination: bool = False, 1488 ) -> List[Tuple[CoroID, Any, Exception]]: 1489 i: Interface = current_interface() 1490 return await await_tasks_atomic_explicit( 1491 i, 1492 tasks, 1493 terminate_tree, 1494 graceful_termination_settings, 1495 timeout, 1496 terminate_other_tasks, 1497 terminate_other_tasks_on_success, 1498 terminate_other_tasks_on_failure, 1499 terminate_other_tasks_on_timeout, 1500 wait_for_termination, 1501 ) 1502 1503 1504await_tasks_atomic: Callable = await_tasks_atomic_explicit 1505await_tasks_atomic_im: Callable = await_tasks_atomic_implicit 1506 1507 1508def wait_tasks_atomic_explicit( 1509 i: Interface, 1510 tasks: Union[Task, List[Task]], 1511 terminate_tree: bool = False, 1512 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1513 timeout: Optional[float] = None, 1514 terminate_other_tasks: Optional[bool] = None, 1515 terminate_other_tasks_on_success: bool = False, 1516 terminate_other_tasks_on_failure: bool = False, 1517 terminate_other_tasks_on_timeout: bool = False, 1518 wait_for_termination: bool = False, 1519 ) -> List[Tuple[CoroID, Any, Exception]]: 1520 i(RunCoro, await_tasks_atomic_explicit, 1521 tasks, 1522 terminate_tree, 1523 graceful_termination_settings, 1524 timeout, 1525 terminate_other_tasks, 1526 terminate_other_tasks_on_success, 1527 terminate_other_tasks_on_failure, 1528 terminate_other_tasks_on_timeout, 1529 wait_for_termination, 1530 ) 1531 1532 1533def wait_tasks_atomic_implicit( 1534 tasks: Union[Task, List[Task]], 1535 terminate_tree: bool = False, 1536 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1537 timeout: Optional[float] = None, 1538 terminate_other_tasks: Optional[bool] = None, 1539 terminate_other_tasks_on_success: bool = False, 1540 terminate_other_tasks_on_failure: bool = False, 1541 terminate_other_tasks_on_timeout: bool = False, 1542 wait_for_termination: bool = False, 1543 ) -> List[Tuple[CoroID, Any, Exception]]: 1544 i: Interface = current_interface() 1545 return wait_tasks_atomic_explicit( 1546 i, 1547 tasks, 1548 terminate_tree, 1549 graceful_termination_settings, 1550 timeout, 1551 terminate_other_tasks, 1552 terminate_other_tasks_on_success, 1553 terminate_other_tasks_on_failure, 1554 terminate_other_tasks_on_timeout, 1555 wait_for_termination, 1556 ) 1557 1558 1559wait_tasks_atomic: Callable = wait_tasks_atomic_explicit 1560wait_tasks_atomic_im: Callable = wait_tasks_atomic_implicit 1561 1562 1563# ================================================================================= 1564 1565 1566async def await_tasks_atomic_custom_explicit( 1567 i: Interface, 1568 tasks: Union[Task, List[Task]], 1569 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1570 first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 1571 terminate_tree: bool = False, 1572 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1573 timeout: Optional[float] = None, 1574 terminate_other_tasks: Optional[bool] = None, 1575 terminate_other_tasks_on_success: bool = False, 1576 terminate_other_tasks_on_failure: bool = False, 1577 terminate_other_tasks_on_timeout: bool = False, 1578 wait_for_termination: bool = False, 1579 ): 1580 if terminate_other_tasks is not None: 1581 terminate_other_tasks_on_success = terminate_other_tasks 1582 terminate_other_tasks_on_failure = terminate_other_tasks 1583 terminate_other_tasks_on_timeout = terminate_other_tasks 1584 1585 if isinstance(tasks, Task): 1586 tasks = [tasks] 1587 1588 if not tasks: 1589 raise TasksListIsEmptyError 1590 1591 atomic_custom_wait_id: str = str(f'wait_task__atomic_custom__{uuid4()}') 1592 atomic_custom_info: AtomicCustomInfo = AtomicCustomInfo( 1593 atomic_custom_wait_id, 1594 tasks, 1595 result_criteria_handler, 1596 first_failed_result_formatter, 1597 ) 1598 1599 if timeout is not None: 1600 # TODO: implement discard timer request when needed in order to improve performance 1601 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, atomic_custom_info.on_timeout) 1602 1603 for another_task in tasks: 1604 another_task.add_on_done_handler(AtomicCustomOnDoneHandler(atomic_custom_info)) 1605 1606 await i(AsyncEventBus, AsyncEventBusRequest().wait(atomic_custom_wait_id)) 1607 results: List[Tuple[CoroID, Any, Exception]] = atomic_custom_info.gather() 1608 if atomic_custom_info.timeout: 1609 termination_reason: TerminationReason = TerminationReason.timeout 1610 elif atomic_custom_info.failure: 1611 termination_reason = TerminationReason.failure 1612 else: 1613 termination_reason = TerminationReason.success 1614 1615 await aterminate_tasks_explicit( 1616 i, 1617 tasks, 1618 termination_reason, 1619 terminate_tree, 1620 graceful_termination_settings, 1621 terminate_other_tasks, 1622 terminate_other_tasks_on_success, 1623 terminate_other_tasks_on_failure, 1624 terminate_other_tasks_on_timeout, 1625 wait_for_termination, 1626 ) 1627 return results 1628 1629 1630async def await_tasks_atomic_custom_implicit( 1631 tasks: Union[Task, List[Task]], 1632 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1633 first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 1634 terminate_tree: bool = False, 1635 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1636 timeout: Optional[float] = None, 1637 terminate_other_tasks: Optional[bool] = None, 1638 terminate_other_tasks_on_success: bool = False, 1639 terminate_other_tasks_on_failure: bool = False, 1640 terminate_other_tasks_on_timeout: bool = False, 1641 wait_for_termination: bool = False, 1642 ) -> List[Tuple[CoroID, Any, Exception]]: 1643 i: Interface = current_interface() 1644 return await await_tasks_atomic_custom_explicit( 1645 i, 1646 tasks, 1647 result_criteria_handler, 1648 first_failed_result_formatter, 1649 terminate_tree, 1650 graceful_termination_settings, 1651 timeout, 1652 terminate_other_tasks, 1653 terminate_other_tasks_on_success, 1654 terminate_other_tasks_on_failure, 1655 terminate_other_tasks_on_timeout, 1656 wait_for_termination, 1657 ) 1658 1659 1660await_tasks_atomic_custom: Callable = await_tasks_atomic_custom_explicit 1661await_tasks_atomic_custom_im: Callable = await_tasks_atomic_custom_implicit 1662 1663 1664def wait_tasks_atomic_custom_explicit( 1665 i: Interface, 1666 tasks: Union[Task, List[Task]], 1667 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1668 first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 1669 terminate_tree: bool = False, 1670 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1671 timeout: Optional[float] = None, 1672 terminate_other_tasks: Optional[bool] = None, 1673 terminate_other_tasks_on_success: bool = False, 1674 terminate_other_tasks_on_failure: bool = False, 1675 terminate_other_tasks_on_timeout: bool = False, 1676 wait_for_termination: bool = False, 1677 ) -> List[Tuple[CoroID, Any, Exception]]: 1678 i(RunCoro, await_tasks_atomic_custom_explicit, 1679 tasks, 1680 result_criteria_handler, 1681 first_failed_result_formatter, 1682 terminate_tree, 1683 graceful_termination_settings, 1684 timeout, 1685 terminate_other_tasks, 1686 terminate_other_tasks_on_success, 1687 terminate_other_tasks_on_failure, 1688 terminate_other_tasks_on_timeout, 1689 wait_for_termination, 1690 ) 1691 1692 1693def wait_tasks_atomic_custom_implicit( 1694 tasks: Union[Task, List[Task]], 1695 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1696 first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 1697 terminate_tree: bool = False, 1698 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1699 timeout: Optional[float] = None, 1700 terminate_other_tasks: Optional[bool] = None, 1701 terminate_other_tasks_on_success: bool = False, 1702 terminate_other_tasks_on_failure: bool = False, 1703 terminate_other_tasks_on_timeout: bool = False, 1704 wait_for_termination: bool = False, 1705 ) -> List[Tuple[CoroID, Any, Exception]]: 1706 i: Interface = current_interface() 1707 return wait_tasks_atomic_custom_explicit( 1708 i, 1709 tasks, 1710 result_criteria_handler, 1711 first_failed_result_formatter, 1712 terminate_tree, 1713 graceful_termination_settings, 1714 timeout, 1715 terminate_other_tasks, 1716 terminate_other_tasks_on_success, 1717 terminate_other_tasks_on_failure, 1718 terminate_other_tasks_on_timeout, 1719 wait_for_termination, 1720 ) 1721 1722 1723wait_tasks_atomic_custom: Callable = wait_tasks_atomic_custom_explicit 1724wait_tasks_atomic_custom_im: Callable = wait_tasks_atomic_custom_implicit 1725 1726 1727# ================================================================================= 1728 1729 1730def wait_task_explicit(i: Interface, task: Union[Task, List[Task]]): 1731 wait_id: str = str(f'wait_task__{uuid4()}') 1732 def on_done_handler(): 1733 put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high) 1734 1735 task.add_on_done_handler(on_done_handler) 1736 i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id)) 1737 return task.result 1738 1739 1740def wait_task_implicit(task: Union[Task, List[Task]]): 1741 i: Interface = current_interface() 1742 wait_id: str = str(f'wait_task__{uuid4()}') 1743 def on_done_handler(): 1744 put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high) 1745 1746 task.add_on_done_handler(on_done_handler) 1747 i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id)) 1748 return task.result 1749 1750 1751wait_task = wait_task_explicit 1752wait_task_im = wait_task_implicit 1753 1754 1755async def await_task_explicit(i: Interface, task: Union[Task, List[Task]]): 1756 wait_id: str = str(f'wait_task__{uuid4()}') 1757 def on_done_handler(): 1758 put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high) 1759 1760 task.add_on_done_handler(on_done_handler) 1761 await i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id)) 1762 return task.result 1763 1764 1765async def await_task_implicit(task: Union[Task, List[Task]]): 1766 i: Interface = current_interface() 1767 wait_id: str = str(f'wait_task__{uuid4()}') 1768 def on_done_handler(): 1769 put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high) 1770 1771 task.add_on_done_handler(on_done_handler) 1772 await i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id)) 1773 return task.result 1774 1775 1776await_task = await_task_explicit 1777await_task_im = await_task_implicit
class
WaitType(enum.IntEnum):
155class WaitType(IntEnum): 156 normal = 0 157 fastest = 1 158 fastest_successful = 2 159 fastest_exception = 3 160 fastest_custom = 4 161 atomic = 5 162 atomic_custom = 5
An enumeration.
normal =
<WaitType.normal: 0>
fastest =
<WaitType.fastest: 1>
fastest_successful =
<WaitType.fastest_successful: 2>
fastest_exception =
<WaitType.fastest_exception: 3>
fastest_custom =
<WaitType.fastest_custom: 4>
atomic =
<WaitType.atomic: 5>
atomic_custom =
<WaitType.atomic: 5>
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- to_bytes
- from_bytes
- as_integer_ratio
- real
- imag
- numerator
- denominator
class
TasksListIsEmptyError(builtins.Exception):
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
class
IsNotDoneYetError(builtins.Exception):
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
class
NormalOnDoneHandler:
173class NormalOnDoneHandler: 174 def __init__(self, normal_info: 'NormalInfo') -> None: 175 self.normal_info: 'NormalInfo' = normal_info 176 177 def __call__(self, task: Task) -> Any: 178 self.normal_info.add_result(task.coro_id, task._result, task._exception)
NormalOnDoneHandler( normal_info: NormalInfo)
normal_info: NormalInfo
class
NormalInfo:
181class NormalInfo: 182 def __init__(self, wait_done_id: str, tasks: List[Task]) -> None: 183 self.tasks: List[Task] = tasks 184 self.coroutines_num: int = len(tasks) 185 self.results: List[Tuple[CoroID, Any, Exception]] = list() 186 self.wait_done_id: str = wait_done_id 187 self.ignore: bool = False 188 self._done: bool = False 189 self._timeout: bool = False 190 self._failure: bool = False 191 192 def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None: 193 if self.ignore: 194 return 195 196 self.results.append((coro_id, result, exception)) 197 if len(self.results) >= self.coroutines_num: 198 self.done = True 199 200 @property 201 def failure(self) -> bool: 202 return self._failure 203 204 @failure.setter 205 def failure(self, value: bool) -> None: 206 self._failure = value 207 208 @property 209 def done(self) -> bool: 210 return self._done 211 212 @done.setter 213 def done(self, value: bool) -> None: 214 if self._done: 215 return 216 217 self._done = value 218 if value: 219 self.ignore = True 220 put_request_to_service(AsyncEventBus, self.wait_done_id, None, CoroPriority.high) 221 222 @property 223 def timeout(self) -> bool: 224 return self._timeout 225 226 def on_timeout(self) -> None: 227 self._timeout = True 228 self.done = True 229 230 def gather(self) -> List[Tuple[CoroID, Any, Exception]]: 231 if not self.done: 232 raise IsNotDoneYetError 233 234 return self.results
NormalInfo( wait_done_id: str, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task])
182 def __init__(self, wait_done_id: str, tasks: List[Task]) -> None: 183 self.tasks: List[Task] = tasks 184 self.coroutines_num: int = len(tasks) 185 self.results: List[Tuple[CoroID, Any, Exception]] = list() 186 self.wait_done_id: str = wait_done_id 187 self.ignore: bool = False 188 self._done: bool = False 189 self._timeout: bool = False 190 self._failure: bool = False
tasks: List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]
class
FastestOnDoneHandler:
240class FastestOnDoneHandler: 241 def __init__(self, fastest_info: 'FastestInfo') -> None: 242 self.fastest_info: 'FastestInfo' = fastest_info 243 244 def __call__(self, task: Task) -> Any: 245 self.fastest_info.add_result((task.coro_id, task._result, task._exception))
FastestOnDoneHandler( fastest_info: FastestInfo)
fastest_info: FastestInfo
248class FastestInfo(NormalInfo): 249 def __init__(self, wait_done_id: str, tasks: List[Task]) -> None: 250 super().__init__(wait_done_id, tasks) 251 self.coroutines_num: int = 1
FastestInfo( wait_done_id: str, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task])
Inherited Members
class
FastestSuccessfulOnDoneHandler:
257class FastestSuccessfulOnDoneHandler: 258 def __init__(self, fastest_successful_info: 'FastestSuccessfulInfo') -> None: 259 self.fastest_successful_info: 'FastestSuccessfulInfo' = fastest_successful_info 260 261 def __call__(self, task: Task) -> Any: 262 self.fastest_successful_info.add_result((task.coro_id, task._result, task._exception))
FastestSuccessfulOnDoneHandler( fastest_successful_info: FastestSuccessfulInfo)
fastest_successful_info: FastestSuccessfulInfo
265class FastestSuccessfulInfo(NormalInfo): 266 def __init__(self, wait_done_id: str, tasks: List[Task]) -> None: 267 super().__init__(wait_done_id, tasks) 268 self.fastest_successful_result: Optional[Tuple[CoroID, Any, Exception]] = None 269 270 def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None: 271 if self.ignore: 272 return 273 274 self.results.append((coro_id, result, exception)) 275 if exception is None: 276 self.fastest_successful_result = (coro_id, result, exception) 277 self.done = True 278 elif (len(self.results) >= self.coroutines_num): 279 self.done = True 280 281 def gather(self) -> List[Tuple[CoroID, Any, Exception]]: 282 if not self.done: 283 raise IsNotDoneYetError 284 285 if self.fastest_successful_result is None: 286 self.failure = True 287 288 return [self.fastest_successful_result]
FastestSuccessfulInfo( wait_done_id: str, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task])
def
add_result(self, coro_id: int, result: typing.Any, exception: Exception) -> None:
270 def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None: 271 if self.ignore: 272 return 273 274 self.results.append((coro_id, result, exception)) 275 if exception is None: 276 self.fastest_successful_result = (coro_id, result, exception) 277 self.done = True 278 elif (len(self.results) >= self.coroutines_num): 279 self.done = True
Inherited Members
class
FastestExceptionOnDoneHandler:
294class FastestExceptionOnDoneHandler: 295 def __init__(self, fastest_exception_info: 'FastestExceptionInfo') -> None: 296 self.fastest_exception_info: 'FastestExceptionInfo' = fastest_exception_info 297 298 def __call__(self, task: Task) -> Any: 299 self.fastest_exception_info.add_result((task.coro_id, task._result, task._exception))
FastestExceptionOnDoneHandler( fastest_exception_info: FastestExceptionInfo)
fastest_exception_info: FastestExceptionInfo
302class FastestExceptionInfo(NormalInfo): 303 def __init__(self, wait_done_id: str, tasks: List[Task]) -> None: 304 super().__init__(wait_done_id, tasks) 305 self.fastest_exception_result: Optional[Tuple[CoroID, Any, Exception]] = None 306 307 def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None: 308 if self.ignore: 309 return 310 311 self.results.append((coro_id, result, exception)) 312 if exception is not None: 313 self.fastest_exception_result = (coro_id, result, exception) 314 self.done = True 315 elif (len(self.results) >= self.coroutines_num): 316 self.done = True 317 318 def gather(self) -> List[Tuple[CoroID, Any, Exception]]: 319 if not self.done: 320 raise IsNotDoneYetError 321 322 if self.fastest_exception_result is None: 323 self.failure = True 324 325 return [self.fastest_exception_result]
FastestExceptionInfo( wait_done_id: str, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task])
def
add_result(self, coro_id: int, result: typing.Any, exception: Exception) -> None:
307 def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None: 308 if self.ignore: 309 return 310 311 self.results.append((coro_id, result, exception)) 312 if exception is not None: 313 self.fastest_exception_result = (coro_id, result, exception) 314 self.done = True 315 elif (len(self.results) >= self.coroutines_num): 316 self.done = True
Inherited Members
class
FastestCustomOnDoneHandler:
331class FastestCustomOnDoneHandler: 332 def __init__(self, fastest_custom_info: 'FastestCustomInfo') -> None: 333 self.fastest_custom_info: 'FastestCustomInfo' = fastest_custom_info 334 335 def __call__(self, task: Task) -> Any: 336 self.fastest_custom_info.add_result((task.coro_id, task._result, task._exception))
FastestCustomOnDoneHandler( fastest_custom_info: FastestCustomInfo)
fastest_custom_info: FastestCustomInfo
339class FastestCustomInfo(NormalInfo): 340 def __init__(self, wait_done_id: str, tasks: List[Task], result_criteria_handler: Callable[[CoroID, Any, Exception], bool]) -> None: 341 super().__init__(wait_done_id, tasks) 342 self.result_criteria_handler: Callable[[CoroID, Any, Exception], bool] = result_criteria_handler 343 self.fastest_custom_result: Optional[Tuple[CoroID, Any, Exception]] = None 344 345 def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None: 346 if self.ignore: 347 return 348 349 self.results.append((coro_id, result, exception)) 350 if self.result_criteria_handler(coro_id, result, exception): 351 self.fastest_custom_result = (coro_id, result, exception) 352 self.done = True 353 elif (len(self.results) >= self.coroutines_num): 354 self.done = True 355 356 def gather(self) -> List[Tuple[CoroID, Any, Exception]]: 357 if not self.done: 358 raise IsNotDoneYetError 359 360 if self.fastest_custom_result is None: 361 self.failure = True 362 363 return [self.fastest_custom_result]
FastestCustomInfo( wait_done_id: str, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool])
340 def __init__(self, wait_done_id: str, tasks: List[Task], result_criteria_handler: Callable[[CoroID, Any, Exception], bool]) -> None: 341 super().__init__(wait_done_id, tasks) 342 self.result_criteria_handler: Callable[[CoroID, Any, Exception], bool] = result_criteria_handler 343 self.fastest_custom_result: Optional[Tuple[CoroID, Any, Exception]] = None
def
add_result(self, coro_id: int, result: typing.Any, exception: Exception) -> None:
345 def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None: 346 if self.ignore: 347 return 348 349 self.results.append((coro_id, result, exception)) 350 if self.result_criteria_handler(coro_id, result, exception): 351 self.fastest_custom_result = (coro_id, result, exception) 352 self.done = True 353 elif (len(self.results) >= self.coroutines_num): 354 self.done = True
Inherited Members
class
AtomicOnDoneHandler:
369class AtomicOnDoneHandler: 370 def __init__(self, atomic_info: 'AtomicInfo') -> None: 371 self.atomic_info: 'AtomicInfo' = atomic_info 372 373 def __call__(self, task: Task) -> Any: 374 self.atomic_info.add_result(task.coro_id, task._result, task._exception)
AtomicOnDoneHandler( atomic_info: AtomicInfo)
atomic_info: AtomicInfo
377class AtomicInfo(NormalInfo): 378 def __init__(self, wait_done_id: str, tasks: List[Task], include_first_failure: bool = True) -> None: 379 super().__init__(wait_done_id, tasks) 380 self.include_first_failure: bool = include_first_failure 381 382 def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None: 383 if self.ignore: 384 return 385 386 if exception is not None: 387 if self.include_first_failure: 388 self.results.append((coro_id, result, exception)) 389 390 self.failure = True 391 self.done = True 392 else: 393 self.results.append((coro_id, result, exception)) 394 395 if len(self.results) >= self.coroutines_num: 396 self.done = True 397 398 def gather(self) -> List[Tuple[CoroID, Any, Exception]]: 399 if not self.done: 400 raise IsNotDoneYetError 401 402 normalized_results: List[Tuple[CoroID, Any, Exception]] = list() 403 provided_results: Dict[CoroID, Tuple[Any, Exception]] = dict() 404 for result_info in self.results: 405 if result_info is None: 406 continue 407 408 coro_id, result, exception = result_info 409 provided_results[coro_id] = (result, exception) 410 411 for task in self.tasks: 412 coro_id = task.coro_id 413 if coro_id in provided_results: 414 result, exception = provided_results[coro_id] 415 normalized_results.append((coro_id, result, exception)) 416 else: 417 normalized_results.append(None) 418 419 return normalized_results
AtomicInfo( wait_done_id: str, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], include_first_failure: bool = True)
def
add_result(self, coro_id: int, result: typing.Any, exception: Exception) -> None:
382 def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None: 383 if self.ignore: 384 return 385 386 if exception is not None: 387 if self.include_first_failure: 388 self.results.append((coro_id, result, exception)) 389 390 self.failure = True 391 self.done = True 392 else: 393 self.results.append((coro_id, result, exception)) 394 395 if len(self.results) >= self.coroutines_num: 396 self.done = True
def
gather(self) -> List[Tuple[int, Any, Exception]]:
398 def gather(self) -> List[Tuple[CoroID, Any, Exception]]: 399 if not self.done: 400 raise IsNotDoneYetError 401 402 normalized_results: List[Tuple[CoroID, Any, Exception]] = list() 403 provided_results: Dict[CoroID, Tuple[Any, Exception]] = dict() 404 for result_info in self.results: 405 if result_info is None: 406 continue 407 408 coro_id, result, exception = result_info 409 provided_results[coro_id] = (result, exception) 410 411 for task in self.tasks: 412 coro_id = task.coro_id 413 if coro_id in provided_results: 414 result, exception = provided_results[coro_id] 415 normalized_results.append((coro_id, result, exception)) 416 else: 417 normalized_results.append(None) 418 419 return normalized_results
Inherited Members
class
AtomicCustomOnDoneHandler:
425class AtomicCustomOnDoneHandler: 426 def __init__(self, atomic_custom_info: 'AtomicCustomInfo') -> None: 427 self.atomic_custom_info: 'AtomicCustomInfo' = atomic_custom_info 428 429 def __call__(self, task: Task) -> Any: 430 self.atomic_custom_info.add_result(task.coro_id, task._result, task._exception)
AtomicCustomOnDoneHandler( atomic_custom_info: AtomicCustomInfo)
atomic_custom_info: AtomicCustomInfo
def
success_result_criteria_handler(coro_id: int, result: typing.Any, exception: Exception) -> bool:
def
exception_result_criteria_handler(coro_id: int, result: typing.Any, exception: Exception) -> bool:
def
same_first_failed_result_formatter(coro_id: int, result: typing.Any, exception: Exception) -> Any:
def
none_first_failed_result_formatter(coro_id: int, result: typing.Any, exception: Exception) -> Any:
449class AtomicCustomInfo(NormalInfo): 450 def __init__( 451 self, 452 wait_done_id: str, 453 tasks: List[Task], 454 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 455 first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 456 ) -> None: 457 super().__init__(wait_done_id, tasks) 458 self.result_criteria_handler: Callable[[CoroID, Any, Exception], bool] = result_criteria_handler 459 self.first_failed_result_formatter: Callable[[CoroID, Any, Exception], Any] = \ 460 none_first_failed_result_formatter if first_failed_result_formatter is None else first_failed_result_formatter 461 462 def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None: 463 if self.ignore: 464 return 465 466 self.results.append((coro_id, result, exception)) 467 if (len(self.results) >= self.coroutines_num) or (exception is not None): 468 self.done = True 469 470 if not self.result_criteria_handler(coro_id, result, exception): 471 result = self.first_failed_result_formatter(coro_id, result, exception) 472 self.failure = True 473 self.done = True 474 else: 475 result = (coro_id, result, exception) 476 477 self.results.append(result) 478 if len(self.results) >= self.coroutines_num: 479 self.done = True 480 481 def gather(self) -> List[Tuple[CoroID, Any, Exception]]: 482 if not self.done: 483 raise IsNotDoneYetError 484 485 normalized_results: List[Tuple[CoroID, Any, Exception]] = list() 486 provided_results: Dict[CoroID, Tuple[Any, Exception]] = dict() 487 for result_info in self.results: 488 if result_info is None: 489 continue 490 491 coro_id, result, exception = result_info 492 provided_results[coro_id] = (result, exception) 493 494 for task in self.tasks: 495 coro_id = task.coro_id 496 if coro_id in provided_results: 497 result, exception = provided_results[coro_id] 498 normalized_results.append((coro_id, result, exception)) 499 else: 500 normalized_results.append(None) 501 502 return normalized_results
AtomicCustomInfo( wait_done_id: str, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], first_failed_result_formatter: typing.Union[typing.Callable[[int, typing.Any, Exception], typing.Any], NoneType] = None)
450 def __init__( 451 self, 452 wait_done_id: str, 453 tasks: List[Task], 454 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 455 first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 456 ) -> None: 457 super().__init__(wait_done_id, tasks) 458 self.result_criteria_handler: Callable[[CoroID, Any, Exception], bool] = result_criteria_handler 459 self.first_failed_result_formatter: Callable[[CoroID, Any, Exception], Any] = \ 460 none_first_failed_result_formatter if first_failed_result_formatter is None else first_failed_result_formatter
def
add_result(self, coro_id: int, result: typing.Any, exception: Exception) -> None:
462 def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None: 463 if self.ignore: 464 return 465 466 self.results.append((coro_id, result, exception)) 467 if (len(self.results) >= self.coroutines_num) or (exception is not None): 468 self.done = True 469 470 if not self.result_criteria_handler(coro_id, result, exception): 471 result = self.first_failed_result_formatter(coro_id, result, exception) 472 self.failure = True 473 self.done = True 474 else: 475 result = (coro_id, result, exception) 476 477 self.results.append(result) 478 if len(self.results) >= self.coroutines_num: 479 self.done = True
def
gather(self) -> List[Tuple[int, Any, Exception]]:
481 def gather(self) -> List[Tuple[CoroID, Any, Exception]]: 482 if not self.done: 483 raise IsNotDoneYetError 484 485 normalized_results: List[Tuple[CoroID, Any, Exception]] = list() 486 provided_results: Dict[CoroID, Tuple[Any, Exception]] = dict() 487 for result_info in self.results: 488 if result_info is None: 489 continue 490 491 coro_id, result, exception = result_info 492 provided_results[coro_id] = (result, exception) 493 494 for task in self.tasks: 495 coro_id = task.coro_id 496 if coro_id in provided_results: 497 result, exception = provided_results[coro_id] 498 normalized_results.append((coro_id, result, exception)) 499 else: 500 normalized_results.append(None) 501 502 return normalized_results
Inherited Members
class
GracefulTerminationSettings:
508class GracefulTerminationSettings: 509 def __init__(self, 510 phase_time_limit: Optional[float], 511 ex_type: Type[Exception] = None, ex_value: Exception = None, ex_traceback: Any = None, 512 first_phase_is_wait: bool = True, 513 last_phase_is_kill: bool = True, 514 ) -> None: 515 self.phase_time_limit: Optional[float] = phase_time_limit 516 self.ex_type: Type[Exception] = ex_type 517 self.ex_value: Exception = ex_value 518 self.ex_traceback: Any = ex_traceback 519 self.first_phase_is_wait: bool = first_phase_is_wait 520 self.last_phase_is_kill: bool = last_phase_is_kill
GracefulTerminationSettings( phase_time_limit: typing.Union[float, NoneType], ex_type: typing.Type[Exception] = None, ex_value: Exception = None, ex_traceback: typing.Any = None, first_phase_is_wait: bool = True, last_phase_is_kill: bool = True)
509 def __init__(self, 510 phase_time_limit: Optional[float], 511 ex_type: Type[Exception] = None, ex_value: Exception = None, ex_traceback: Any = None, 512 first_phase_is_wait: bool = True, 513 last_phase_is_kill: bool = True, 514 ) -> None: 515 self.phase_time_limit: Optional[float] = phase_time_limit 516 self.ex_type: Type[Exception] = ex_type 517 self.ex_value: Exception = ex_value 518 self.ex_traceback: Any = ex_traceback 519 self.first_phase_is_wait: bool = first_phase_is_wait 520 self.last_phase_is_kill: bool = last_phase_is_kill
class
TerminationReason(enum.IntEnum):
An enumeration.
success =
<TerminationReason.success: 0>
failure =
<TerminationReason.failure: 1>
timeout =
<TerminationReason.timeout: 2>
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- to_bytes
- from_bytes
- as_integer_ratio
- real
- imag
- numerator
- denominator
async def
atask_graceful_destroyer( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, task: cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None) -> None:
529async def atask_graceful_destroyer( 530 i: Interface, 531 task: Task, 532 tree: bool = False, 533 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 534 ) -> None: 535 await agraceful_coro_destroyer( 536 i, 537 graceful_termination_settings.phase_time_limit, 538 task.coro_id, 539 graceful_termination_settings.ex_type, 540 graceful_termination_settings.ex_value, 541 graceful_termination_settings.ex_traceback, 542 tree, 543 graceful_termination_settings.first_phase_is_wait, 544 graceful_termination_settings.last_phase_is_kill, 545 )
async def
aterminate_tasks_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], termination_reason: TerminationReason, tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> bool:
548async def aterminate_tasks_explicit( 549 i: Interface, 550 tasks: List[Task], 551 termination_reason: TerminationReason, 552 tree: bool = False, 553 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 554 terminate_other_tasks: Optional[bool] = None, 555 terminate_other_tasks_on_success: bool = False, 556 terminate_other_tasks_on_failure: bool = False, 557 terminate_other_tasks_on_timeout: bool = False, 558 wait_for_termination: bool = False, 559 ) -> bool: 560 if not tasks: 561 return False 562 563 if terminate_other_tasks is not None: 564 terminate_other_tasks_on_success = terminate_other_tasks 565 terminate_other_tasks_on_failure = terminate_other_tasks 566 terminate_other_tasks_on_timeout = terminate_other_tasks 567 568 if ((TerminationReason.success == termination_reason) and terminate_other_tasks_on_success) or \ 569 ((TerminationReason.failure == termination_reason) and terminate_other_tasks_on_failure) or \ 570 ((TerminationReason.timeout == termination_reason) and terminate_other_tasks_on_timeout): 571 if graceful_termination_settings is None: 572 termination_tasks_params: List[KillSingleCoroParams] = list([KillSingleCoroParams(task.coro_id, tree) for task in tasks if not task.done]) 573 if not termination_tasks_params: 574 return False 575 576 i(KillCoroList, termination_tasks) 577 else: 578 termination_tasks_params: List[PSCP] = [PSCP(atask_graceful_destroyer, task, tree, graceful_termination_settings) for task in tasks if not task.done] 579 if not termination_tasks_params: 580 return False 581 582 termination_tasks: List[Task] = await i(PutCoroList, termination_tasks_params, True) 583 if wait_for_termination: 584 await await_tasks_explicit(i, termination_tasks, False, None, None, False, False, False, False) 585 586 return True
async def
aterminate_tasks_implicit( tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], termination_reason: TerminationReason, tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> bool:
589async def aterminate_tasks_implicit( 590 tasks: List[Task], 591 termination_reason: TerminationReason, 592 tree: bool = False, 593 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 594 terminate_other_tasks: Optional[bool] = None, 595 terminate_other_tasks_on_success: bool = False, 596 terminate_other_tasks_on_failure: bool = False, 597 terminate_other_tasks_on_timeout: bool = False, 598 wait_for_termination: bool = False, 599 ) -> bool: 600 i: Interface = current_interface() 601 return await aterminate_tasks_explicit( 602 i, 603 tasks, 604 termination_reason, 605 tree, 606 graceful_termination_settings, 607 terminate_other_tasks, 608 terminate_other_tasks_on_success, 609 terminate_other_tasks_on_failure, 610 terminate_other_tasks_on_timeout, 611 wait_for_termination, 612 )
async def
aterminate_tasks( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], termination_reason: TerminationReason, tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> bool:
548async def aterminate_tasks_explicit( 549 i: Interface, 550 tasks: List[Task], 551 termination_reason: TerminationReason, 552 tree: bool = False, 553 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 554 terminate_other_tasks: Optional[bool] = None, 555 terminate_other_tasks_on_success: bool = False, 556 terminate_other_tasks_on_failure: bool = False, 557 terminate_other_tasks_on_timeout: bool = False, 558 wait_for_termination: bool = False, 559 ) -> bool: 560 if not tasks: 561 return False 562 563 if terminate_other_tasks is not None: 564 terminate_other_tasks_on_success = terminate_other_tasks 565 terminate_other_tasks_on_failure = terminate_other_tasks 566 terminate_other_tasks_on_timeout = terminate_other_tasks 567 568 if ((TerminationReason.success == termination_reason) and terminate_other_tasks_on_success) or \ 569 ((TerminationReason.failure == termination_reason) and terminate_other_tasks_on_failure) or \ 570 ((TerminationReason.timeout == termination_reason) and terminate_other_tasks_on_timeout): 571 if graceful_termination_settings is None: 572 termination_tasks_params: List[KillSingleCoroParams] = list([KillSingleCoroParams(task.coro_id, tree) for task in tasks if not task.done]) 573 if not termination_tasks_params: 574 return False 575 576 i(KillCoroList, termination_tasks) 577 else: 578 termination_tasks_params: List[PSCP] = [PSCP(atask_graceful_destroyer, task, tree, graceful_termination_settings) for task in tasks if not task.done] 579 if not termination_tasks_params: 580 return False 581 582 termination_tasks: List[Task] = await i(PutCoroList, termination_tasks_params, True) 583 if wait_for_termination: 584 await await_tasks_explicit(i, termination_tasks, False, None, None, False, False, False, False) 585 586 return True
async def
aterminate_tasks_im( tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], termination_reason: TerminationReason, tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> bool:
589async def aterminate_tasks_implicit( 590 tasks: List[Task], 591 termination_reason: TerminationReason, 592 tree: bool = False, 593 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 594 terminate_other_tasks: Optional[bool] = None, 595 terminate_other_tasks_on_success: bool = False, 596 terminate_other_tasks_on_failure: bool = False, 597 terminate_other_tasks_on_timeout: bool = False, 598 wait_for_termination: bool = False, 599 ) -> bool: 600 i: Interface = current_interface() 601 return await aterminate_tasks_explicit( 602 i, 603 tasks, 604 termination_reason, 605 tree, 606 graceful_termination_settings, 607 terminate_other_tasks, 608 terminate_other_tasks_on_success, 609 terminate_other_tasks_on_failure, 610 terminate_other_tasks_on_timeout, 611 wait_for_termination, 612 )
def
terminate_tasks_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], termination_reason: TerminationReason, tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> bool:
619def terminate_tasks_explicit( 620 i: Interface, 621 tasks: List[Task], 622 termination_reason: TerminationReason, 623 tree: bool = False, 624 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 625 terminate_other_tasks: Optional[bool] = None, 626 terminate_other_tasks_on_success: bool = False, 627 terminate_other_tasks_on_failure: bool = False, 628 terminate_other_tasks_on_timeout: bool = False, 629 wait_for_termination: bool = False, 630 ) -> bool: 631 i(RunCoro, aterminate_tasks_explicit, 632 tasks, 633 termination_reason, 634 tree, 635 graceful_termination_settings, 636 terminate_other_tasks, 637 terminate_other_tasks_on_success, 638 terminate_other_tasks_on_failure, 639 terminate_other_tasks_on_timeout, 640 wait_for_termination, 641 )
def
terminate_tasks_implicit( tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], termination_reason: TerminationReason, tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> bool:
644def terminate_tasks_implicit( 645 tasks: List[Task], 646 termination_reason: TerminationReason, 647 tree: bool = False, 648 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 649 terminate_other_tasks: Optional[bool] = None, 650 terminate_other_tasks_on_success: bool = False, 651 terminate_other_tasks_on_failure: bool = False, 652 terminate_other_tasks_on_timeout: bool = False, 653 wait_for_termination: bool = False, 654 ) -> bool: 655 i: Interface = current_interface() 656 return terminate_tasks_explicit( 657 i, 658 tasks, 659 termination_reason, 660 tree, 661 graceful_termination_settings, 662 terminate_other_tasks, 663 terminate_other_tasks_on_success, 664 terminate_other_tasks_on_failure, 665 terminate_other_tasks_on_timeout, 666 wait_for_termination, 667 )
def
terminate_tasks( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], termination_reason: TerminationReason, tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> bool:
619def terminate_tasks_explicit( 620 i: Interface, 621 tasks: List[Task], 622 termination_reason: TerminationReason, 623 tree: bool = False, 624 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 625 terminate_other_tasks: Optional[bool] = None, 626 terminate_other_tasks_on_success: bool = False, 627 terminate_other_tasks_on_failure: bool = False, 628 terminate_other_tasks_on_timeout: bool = False, 629 wait_for_termination: bool = False, 630 ) -> bool: 631 i(RunCoro, aterminate_tasks_explicit, 632 tasks, 633 termination_reason, 634 tree, 635 graceful_termination_settings, 636 terminate_other_tasks, 637 terminate_other_tasks_on_success, 638 terminate_other_tasks_on_failure, 639 terminate_other_tasks_on_timeout, 640 wait_for_termination, 641 )
def
terminate_tasks_im( tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], termination_reason: TerminationReason, tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> bool:
644def terminate_tasks_implicit( 645 tasks: List[Task], 646 termination_reason: TerminationReason, 647 tree: bool = False, 648 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 649 terminate_other_tasks: Optional[bool] = None, 650 terminate_other_tasks_on_success: bool = False, 651 terminate_other_tasks_on_failure: bool = False, 652 terminate_other_tasks_on_timeout: bool = False, 653 wait_for_termination: bool = False, 654 ) -> bool: 655 i: Interface = current_interface() 656 return terminate_tasks_explicit( 657 i, 658 tasks, 659 termination_reason, 660 tree, 661 graceful_termination_settings, 662 terminate_other_tasks, 663 terminate_other_tasks_on_success, 664 terminate_other_tasks_on_failure, 665 terminate_other_tasks_on_timeout, 666 wait_for_termination, 667 )
async def
await_tasks_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
677async def await_tasks_explicit( 678 i: Interface, 679 tasks: Union[Task, List[Task]], 680 terminate_tree: bool = False, 681 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 682 timeout: Optional[float] = None, 683 terminate_other_tasks: Optional[bool] = None, 684 terminate_other_tasks_on_success: bool = False, 685 terminate_other_tasks_on_failure: bool = False, 686 terminate_other_tasks_on_timeout: bool = False, 687 wait_for_termination: bool = False, 688 ) -> List[Tuple[CoroID, Any, Exception]]: 689 if isinstance(tasks, Task): 690 tasks = [tasks] 691 692 if not tasks: 693 raise TasksListIsEmptyError 694 695 normal_wait_id: str = str(f'wait_task__normal__{uuid4()}') 696 normal_info: NormalInfo = NormalInfo(normal_wait_id, tasks) 697 698 if timeout is not None: 699 # TODO: implement discard timer request when needed in order to improve performance 700 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, normal_info.on_timeout) 701 702 for another_task in tasks: 703 another_task.add_on_done_handler(NormalOnDoneHandler(normal_info)) 704 705 await i(AsyncEventBus, AsyncEventBusRequest().wait(normal_wait_id)) 706 results: List[Tuple[CoroID, Any, Exception]] = normal_info.gather() 707 if normal_info.timeout: 708 termination_reason: TerminationReason = TerminationReason.timeout 709 elif normal_info.failure: 710 termination_reason = TerminationReason.failure 711 else: 712 termination_reason = TerminationReason.success 713 714 await terminate_tasks_explicit( 715 i, 716 tasks, 717 termination_reason, 718 terminate_tree, 719 graceful_termination_settings, 720 terminate_other_tasks, 721 terminate_other_tasks_on_success, 722 terminate_other_tasks_on_failure, 723 terminate_other_tasks_on_timeout, 724 wait_for_termination, 725 ) 726 return results
async def
await_tasks_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
729async def await_tasks_implicit( 730 tasks: Union[Task, List[Task]], 731 terminate_tree: bool = False, 732 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 733 timeout: Optional[float] = None, 734 terminate_other_tasks: Optional[bool] = None, 735 terminate_other_tasks_on_success: bool = False, 736 terminate_other_tasks_on_failure: bool = False, 737 terminate_other_tasks_on_timeout: bool = False, 738 wait_for_termination: bool = False, 739 ) -> List[Tuple[CoroID, Any, Exception]]: 740 i: Interface = current_interface() 741 return await await_tasks_explicit( 742 i, 743 tasks, 744 terminate_tree, 745 graceful_termination_settings, 746 timeout, 747 terminate_other_tasks, 748 terminate_other_tasks_on_success, 749 terminate_other_tasks_on_failure, 750 terminate_other_tasks_on_timeout, 751 wait_for_termination, 752 )
async def
await_tasks( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
677async def await_tasks_explicit( 678 i: Interface, 679 tasks: Union[Task, List[Task]], 680 terminate_tree: bool = False, 681 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 682 timeout: Optional[float] = None, 683 terminate_other_tasks: Optional[bool] = None, 684 terminate_other_tasks_on_success: bool = False, 685 terminate_other_tasks_on_failure: bool = False, 686 terminate_other_tasks_on_timeout: bool = False, 687 wait_for_termination: bool = False, 688 ) -> List[Tuple[CoroID, Any, Exception]]: 689 if isinstance(tasks, Task): 690 tasks = [tasks] 691 692 if not tasks: 693 raise TasksListIsEmptyError 694 695 normal_wait_id: str = str(f'wait_task__normal__{uuid4()}') 696 normal_info: NormalInfo = NormalInfo(normal_wait_id, tasks) 697 698 if timeout is not None: 699 # TODO: implement discard timer request when needed in order to improve performance 700 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, normal_info.on_timeout) 701 702 for another_task in tasks: 703 another_task.add_on_done_handler(NormalOnDoneHandler(normal_info)) 704 705 await i(AsyncEventBus, AsyncEventBusRequest().wait(normal_wait_id)) 706 results: List[Tuple[CoroID, Any, Exception]] = normal_info.gather() 707 if normal_info.timeout: 708 termination_reason: TerminationReason = TerminationReason.timeout 709 elif normal_info.failure: 710 termination_reason = TerminationReason.failure 711 else: 712 termination_reason = TerminationReason.success 713 714 await terminate_tasks_explicit( 715 i, 716 tasks, 717 termination_reason, 718 terminate_tree, 719 graceful_termination_settings, 720 terminate_other_tasks, 721 terminate_other_tasks_on_success, 722 terminate_other_tasks_on_failure, 723 terminate_other_tasks_on_timeout, 724 wait_for_termination, 725 ) 726 return results
async def
await_tasks_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
729async def await_tasks_implicit( 730 tasks: Union[Task, List[Task]], 731 terminate_tree: bool = False, 732 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 733 timeout: Optional[float] = None, 734 terminate_other_tasks: Optional[bool] = None, 735 terminate_other_tasks_on_success: bool = False, 736 terminate_other_tasks_on_failure: bool = False, 737 terminate_other_tasks_on_timeout: bool = False, 738 wait_for_termination: bool = False, 739 ) -> List[Tuple[CoroID, Any, Exception]]: 740 i: Interface = current_interface() 741 return await await_tasks_explicit( 742 i, 743 tasks, 744 terminate_tree, 745 graceful_termination_settings, 746 timeout, 747 terminate_other_tasks, 748 terminate_other_tasks_on_success, 749 terminate_other_tasks_on_failure, 750 terminate_other_tasks_on_timeout, 751 wait_for_termination, 752 )
def
wait_tasks_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
759def wait_tasks_explicit( 760 i: Interface, 761 tasks: Union[Task, List[Task]], 762 terminate_tree: bool = False, 763 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 764 timeout: Optional[float] = None, 765 terminate_other_tasks: Optional[bool] = None, 766 terminate_other_tasks_on_success: bool = False, 767 terminate_other_tasks_on_failure: bool = False, 768 terminate_other_tasks_on_timeout: bool = False, 769 wait_for_termination: bool = False, 770 ) -> List[Tuple[CoroID, Any, Exception]]: 771 i(RunCoro, await_tasks_explicit, 772 tasks, 773 terminate_tree, 774 graceful_termination_settings, 775 timeout, 776 terminate_other_tasks, 777 terminate_other_tasks_on_success, 778 terminate_other_tasks_on_failure, 779 terminate_other_tasks_on_timeout, 780 wait_for_termination, 781 )
def
wait_tasks_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
784def wait_tasks_implicit( 785 tasks: Union[Task, List[Task]], 786 terminate_tree: bool = False, 787 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 788 timeout: Optional[float] = None, 789 terminate_other_tasks: Optional[bool] = None, 790 terminate_other_tasks_on_success: bool = False, 791 terminate_other_tasks_on_failure: bool = False, 792 terminate_other_tasks_on_timeout: bool = False, 793 wait_for_termination: bool = False, 794 ) -> List[Tuple[CoroID, Any, Exception]]: 795 i: Interface = current_interface() 796 return wait_tasks_explicit( 797 i, 798 tasks, 799 terminate_tree, 800 graceful_termination_settings, 801 timeout, 802 terminate_other_tasks, 803 terminate_other_tasks_on_success, 804 terminate_other_tasks_on_failure, 805 terminate_other_tasks_on_timeout, 806 wait_for_termination, 807 )
def
wait_tasks( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
759def wait_tasks_explicit( 760 i: Interface, 761 tasks: Union[Task, List[Task]], 762 terminate_tree: bool = False, 763 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 764 timeout: Optional[float] = None, 765 terminate_other_tasks: Optional[bool] = None, 766 terminate_other_tasks_on_success: bool = False, 767 terminate_other_tasks_on_failure: bool = False, 768 terminate_other_tasks_on_timeout: bool = False, 769 wait_for_termination: bool = False, 770 ) -> List[Tuple[CoroID, Any, Exception]]: 771 i(RunCoro, await_tasks_explicit, 772 tasks, 773 terminate_tree, 774 graceful_termination_settings, 775 timeout, 776 terminate_other_tasks, 777 terminate_other_tasks_on_success, 778 terminate_other_tasks_on_failure, 779 terminate_other_tasks_on_timeout, 780 wait_for_termination, 781 )
def
wait_tasks_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
784def wait_tasks_implicit( 785 tasks: Union[Task, List[Task]], 786 terminate_tree: bool = False, 787 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 788 timeout: Optional[float] = None, 789 terminate_other_tasks: Optional[bool] = None, 790 terminate_other_tasks_on_success: bool = False, 791 terminate_other_tasks_on_failure: bool = False, 792 terminate_other_tasks_on_timeout: bool = False, 793 wait_for_termination: bool = False, 794 ) -> List[Tuple[CoroID, Any, Exception]]: 795 i: Interface = current_interface() 796 return wait_tasks_explicit( 797 i, 798 tasks, 799 terminate_tree, 800 graceful_termination_settings, 801 timeout, 802 terminate_other_tasks, 803 terminate_other_tasks_on_success, 804 terminate_other_tasks_on_failure, 805 terminate_other_tasks_on_timeout, 806 wait_for_termination, 807 )
async def
await_tasks_fastest_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> Tuple[int, Any]:
817async def await_tasks_fastest_explicit( 818 i: Interface, 819 tasks: Union[Task, List[Task]], 820 terminate_tree: bool = False, 821 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 822 timeout: Optional[float] = None, 823 terminate_other_tasks: Optional[bool] = None, 824 terminate_other_tasks_on_success: bool = False, 825 terminate_other_tasks_on_failure: bool = False, 826 terminate_other_tasks_on_timeout: bool = False, 827 wait_for_termination: bool = False, 828 ) -> Tuple[CoroID, Any]: 829 if isinstance(tasks, Task): 830 tasks = [tasks] 831 832 if not tasks: 833 raise TasksListIsEmptyError 834 835 fastest_wait_id: str = str(f'wait_task__fastest__{uuid4()}') 836 fastest_info: FastestInfo = FastestInfo(fastest_wait_id, tasks) 837 838 if timeout is not None: 839 # TODO: implement discard timer request when needed in order to improve performance 840 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_info.on_timeout) 841 842 for another_task in tasks: 843 another_task.add_on_done_handler(FastestOnDoneHandler(fastest_info)) 844 845 await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_wait_id)) 846 results: List[Tuple[CoroID, Any, Exception]] = fastest_info.gather() 847 if fastest_info.timeout: 848 termination_reason: TerminationReason = TerminationReason.timeout 849 elif fastest_info.failure: 850 termination_reason = TerminationReason.failure 851 else: 852 termination_reason = TerminationReason.success 853 854 await aterminate_tasks_explicit( 855 i, 856 tasks, 857 termination_reason, 858 terminate_tree, 859 graceful_termination_settings, 860 terminate_other_tasks, 861 terminate_other_tasks_on_success, 862 terminate_other_tasks_on_failure, 863 terminate_other_tasks_on_timeout, 864 wait_for_termination, 865 ) 866 if not results: 867 return None, None 868 869 coro_id, result, exception = results[0] 870 if exception is not None: 871 if isinstance(exception, TimeoutError) and (coro_id is not None): 872 exception = SubTimeoutError() 873 874 raise exception 875 876 return coro_id, result
async def
await_tasks_fastest_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
879async def await_tasks_fastest_implicit( 880 tasks: Union[Task, List[Task]], 881 terminate_tree: bool = False, 882 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 883 timeout: Optional[float] = None, 884 terminate_other_tasks: Optional[bool] = None, 885 terminate_other_tasks_on_success: bool = False, 886 terminate_other_tasks_on_failure: bool = False, 887 terminate_other_tasks_on_timeout: bool = False, 888 wait_for_termination: bool = False, 889 ) -> List[Tuple[CoroID, Any, Exception]]: 890 i: Interface = current_interface() 891 return await await_tasks_fastest_explicit( 892 i, 893 tasks, 894 terminate_tree, 895 graceful_termination_settings, 896 timeout, 897 terminate_other_tasks, 898 terminate_other_tasks_on_success, 899 terminate_other_tasks_on_failure, 900 terminate_other_tasks_on_timeout, 901 wait_for_termination, 902 )
async def
await_tasks_fastest( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> Tuple[int, Any]:
817async def await_tasks_fastest_explicit( 818 i: Interface, 819 tasks: Union[Task, List[Task]], 820 terminate_tree: bool = False, 821 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 822 timeout: Optional[float] = None, 823 terminate_other_tasks: Optional[bool] = None, 824 terminate_other_tasks_on_success: bool = False, 825 terminate_other_tasks_on_failure: bool = False, 826 terminate_other_tasks_on_timeout: bool = False, 827 wait_for_termination: bool = False, 828 ) -> Tuple[CoroID, Any]: 829 if isinstance(tasks, Task): 830 tasks = [tasks] 831 832 if not tasks: 833 raise TasksListIsEmptyError 834 835 fastest_wait_id: str = str(f'wait_task__fastest__{uuid4()}') 836 fastest_info: FastestInfo = FastestInfo(fastest_wait_id, tasks) 837 838 if timeout is not None: 839 # TODO: implement discard timer request when needed in order to improve performance 840 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_info.on_timeout) 841 842 for another_task in tasks: 843 another_task.add_on_done_handler(FastestOnDoneHandler(fastest_info)) 844 845 await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_wait_id)) 846 results: List[Tuple[CoroID, Any, Exception]] = fastest_info.gather() 847 if fastest_info.timeout: 848 termination_reason: TerminationReason = TerminationReason.timeout 849 elif fastest_info.failure: 850 termination_reason = TerminationReason.failure 851 else: 852 termination_reason = TerminationReason.success 853 854 await aterminate_tasks_explicit( 855 i, 856 tasks, 857 termination_reason, 858 terminate_tree, 859 graceful_termination_settings, 860 terminate_other_tasks, 861 terminate_other_tasks_on_success, 862 terminate_other_tasks_on_failure, 863 terminate_other_tasks_on_timeout, 864 wait_for_termination, 865 ) 866 if not results: 867 return None, None 868 869 coro_id, result, exception = results[0] 870 if exception is not None: 871 if isinstance(exception, TimeoutError) and (coro_id is not None): 872 exception = SubTimeoutError() 873 874 raise exception 875 876 return coro_id, result
async def
await_tasks_fastest_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
879async def await_tasks_fastest_implicit( 880 tasks: Union[Task, List[Task]], 881 terminate_tree: bool = False, 882 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 883 timeout: Optional[float] = None, 884 terminate_other_tasks: Optional[bool] = None, 885 terminate_other_tasks_on_success: bool = False, 886 terminate_other_tasks_on_failure: bool = False, 887 terminate_other_tasks_on_timeout: bool = False, 888 wait_for_termination: bool = False, 889 ) -> List[Tuple[CoroID, Any, Exception]]: 890 i: Interface = current_interface() 891 return await await_tasks_fastest_explicit( 892 i, 893 tasks, 894 terminate_tree, 895 graceful_termination_settings, 896 timeout, 897 terminate_other_tasks, 898 terminate_other_tasks_on_success, 899 terminate_other_tasks_on_failure, 900 terminate_other_tasks_on_timeout, 901 wait_for_termination, 902 )
def
wait_tasks_fastest_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
909def wait_tasks_fastest_explicit( 910 i: Interface, 911 tasks: Union[Task, List[Task]], 912 terminate_tree: bool = False, 913 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 914 timeout: Optional[float] = None, 915 terminate_other_tasks: Optional[bool] = None, 916 terminate_other_tasks_on_success: bool = False, 917 terminate_other_tasks_on_failure: bool = False, 918 terminate_other_tasks_on_timeout: bool = False, 919 wait_for_termination: bool = False, 920 ) -> List[Tuple[CoroID, Any, Exception]]: 921 i(RunCoro, await_tasks_fastest_explicit, 922 tasks, 923 terminate_tree, 924 graceful_termination_settings, 925 timeout, 926 terminate_other_tasks, 927 terminate_other_tasks_on_success, 928 terminate_other_tasks_on_failure, 929 terminate_other_tasks_on_timeout, 930 wait_for_termination, 931 )
def
wait_tasks_fastest_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
934def wait_tasks_fastest_implicit( 935 tasks: Union[Task, List[Task]], 936 terminate_tree: bool = False, 937 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 938 timeout: Optional[float] = None, 939 terminate_other_tasks: Optional[bool] = None, 940 terminate_other_tasks_on_success: bool = False, 941 terminate_other_tasks_on_failure: bool = False, 942 terminate_other_tasks_on_timeout: bool = False, 943 wait_for_termination: bool = False, 944 ) -> List[Tuple[CoroID, Any, Exception]]: 945 i: Interface = current_interface() 946 return wait_tasks_fastest_explicit( 947 i, 948 tasks, 949 terminate_tree, 950 graceful_termination_settings, 951 timeout, 952 terminate_other_tasks, 953 terminate_other_tasks_on_success, 954 terminate_other_tasks_on_failure, 955 terminate_other_tasks_on_timeout, 956 wait_for_termination, 957 )
def
wait_tasks_fastest( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
909def wait_tasks_fastest_explicit( 910 i: Interface, 911 tasks: Union[Task, List[Task]], 912 terminate_tree: bool = False, 913 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 914 timeout: Optional[float] = None, 915 terminate_other_tasks: Optional[bool] = None, 916 terminate_other_tasks_on_success: bool = False, 917 terminate_other_tasks_on_failure: bool = False, 918 terminate_other_tasks_on_timeout: bool = False, 919 wait_for_termination: bool = False, 920 ) -> List[Tuple[CoroID, Any, Exception]]: 921 i(RunCoro, await_tasks_fastest_explicit, 922 tasks, 923 terminate_tree, 924 graceful_termination_settings, 925 timeout, 926 terminate_other_tasks, 927 terminate_other_tasks_on_success, 928 terminate_other_tasks_on_failure, 929 terminate_other_tasks_on_timeout, 930 wait_for_termination, 931 )
def
wait_tasks_fastest_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
934def wait_tasks_fastest_implicit( 935 tasks: Union[Task, List[Task]], 936 terminate_tree: bool = False, 937 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 938 timeout: Optional[float] = None, 939 terminate_other_tasks: Optional[bool] = None, 940 terminate_other_tasks_on_success: bool = False, 941 terminate_other_tasks_on_failure: bool = False, 942 terminate_other_tasks_on_timeout: bool = False, 943 wait_for_termination: bool = False, 944 ) -> List[Tuple[CoroID, Any, Exception]]: 945 i: Interface = current_interface() 946 return wait_tasks_fastest_explicit( 947 i, 948 tasks, 949 terminate_tree, 950 graceful_termination_settings, 951 timeout, 952 terminate_other_tasks, 953 terminate_other_tasks_on_success, 954 terminate_other_tasks_on_failure, 955 terminate_other_tasks_on_timeout, 956 wait_for_termination, 957 )
async def
await_tasks_fastest_successful_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> Tuple[int, Any]:
967async def await_tasks_fastest_successful_explicit( 968 i: Interface, 969 tasks: Union[Task, List[Task]], 970 terminate_tree: bool = False, 971 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 972 timeout: Optional[float] = None, 973 terminate_other_tasks: Optional[bool] = None, 974 terminate_other_tasks_on_success: bool = False, 975 terminate_other_tasks_on_failure: bool = False, 976 terminate_other_tasks_on_timeout: bool = False, 977 wait_for_termination: bool = False, 978 ) -> Tuple[CoroID, Any]: 979 if isinstance(tasks, Task): 980 tasks = [tasks] 981 982 if not tasks: 983 raise TasksListIsEmptyError 984 985 fastest_successful_wait_id: str = str(f'wait_task__fastest_successful__{uuid4()}') 986 fastest_successful_info: FastestSuccessfulInfo = FastestSuccessfulInfo(fastest_successful_wait_id, tasks) 987 988 if timeout is not None: 989 # TODO: implement discard timer request when needed in order to improve performance 990 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_successful_info.on_timeout) 991 992 for another_task in tasks: 993 another_task.add_on_done_handler(FastestSuccessfulOnDoneHandler(fastest_successful_info)) 994 995 await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_successful_wait_id)) 996 results: List[Tuple[CoroID, Any, Exception]] = fastest_successful_info.gather() 997 if fastest_successful_info.timeout: 998 termination_reason: TerminationReason = TerminationReason.timeout 999 elif fastest_successful_info.failure: 1000 termination_reason = TerminationReason.failure 1001 else: 1002 termination_reason = TerminationReason.success 1003 1004 await aterminate_tasks_explicit( 1005 i, 1006 tasks, 1007 termination_reason, 1008 terminate_tree, 1009 graceful_termination_settings, 1010 terminate_other_tasks, 1011 terminate_other_tasks_on_success, 1012 terminate_other_tasks_on_failure, 1013 terminate_other_tasks_on_timeout, 1014 wait_for_termination, 1015 ) 1016 if not results: 1017 return None, None 1018 1019 coro_id, result, exception = results[0] 1020 if exception is not None: 1021 if isinstance(exception, TimeoutError) and (coro_id is not None): 1022 exception = SubTimeoutError() 1023 1024 raise exception 1025 1026 return coro_id, result
async def
await_tasks_fastest_successful_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1029async def await_tasks_fastest_successful_implicit( 1030 tasks: Union[Task, List[Task]], 1031 terminate_tree: bool = False, 1032 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1033 timeout: Optional[float] = None, 1034 terminate_other_tasks: Optional[bool] = None, 1035 terminate_other_tasks_on_success: bool = False, 1036 terminate_other_tasks_on_failure: bool = False, 1037 terminate_other_tasks_on_timeout: bool = False, 1038 wait_for_termination: bool = False, 1039 ) -> List[Tuple[CoroID, Any, Exception]]: 1040 i: Interface = current_interface() 1041 return await await_tasks_fastest_successful_explicit( 1042 i, 1043 tasks, 1044 terminate_tree, 1045 graceful_termination_settings, 1046 timeout, 1047 terminate_other_tasks, 1048 terminate_other_tasks_on_success, 1049 terminate_other_tasks_on_failure, 1050 terminate_other_tasks_on_timeout, 1051 wait_for_termination, 1052 )
async def
await_tasks_fastest_successful( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> Tuple[int, Any]:
967async def await_tasks_fastest_successful_explicit( 968 i: Interface, 969 tasks: Union[Task, List[Task]], 970 terminate_tree: bool = False, 971 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 972 timeout: Optional[float] = None, 973 terminate_other_tasks: Optional[bool] = None, 974 terminate_other_tasks_on_success: bool = False, 975 terminate_other_tasks_on_failure: bool = False, 976 terminate_other_tasks_on_timeout: bool = False, 977 wait_for_termination: bool = False, 978 ) -> Tuple[CoroID, Any]: 979 if isinstance(tasks, Task): 980 tasks = [tasks] 981 982 if not tasks: 983 raise TasksListIsEmptyError 984 985 fastest_successful_wait_id: str = str(f'wait_task__fastest_successful__{uuid4()}') 986 fastest_successful_info: FastestSuccessfulInfo = FastestSuccessfulInfo(fastest_successful_wait_id, tasks) 987 988 if timeout is not None: 989 # TODO: implement discard timer request when needed in order to improve performance 990 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_successful_info.on_timeout) 991 992 for another_task in tasks: 993 another_task.add_on_done_handler(FastestSuccessfulOnDoneHandler(fastest_successful_info)) 994 995 await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_successful_wait_id)) 996 results: List[Tuple[CoroID, Any, Exception]] = fastest_successful_info.gather() 997 if fastest_successful_info.timeout: 998 termination_reason: TerminationReason = TerminationReason.timeout 999 elif fastest_successful_info.failure: 1000 termination_reason = TerminationReason.failure 1001 else: 1002 termination_reason = TerminationReason.success 1003 1004 await aterminate_tasks_explicit( 1005 i, 1006 tasks, 1007 termination_reason, 1008 terminate_tree, 1009 graceful_termination_settings, 1010 terminate_other_tasks, 1011 terminate_other_tasks_on_success, 1012 terminate_other_tasks_on_failure, 1013 terminate_other_tasks_on_timeout, 1014 wait_for_termination, 1015 ) 1016 if not results: 1017 return None, None 1018 1019 coro_id, result, exception = results[0] 1020 if exception is not None: 1021 if isinstance(exception, TimeoutError) and (coro_id is not None): 1022 exception = SubTimeoutError() 1023 1024 raise exception 1025 1026 return coro_id, result
async def
await_tasks_fastest_successful_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1029async def await_tasks_fastest_successful_implicit( 1030 tasks: Union[Task, List[Task]], 1031 terminate_tree: bool = False, 1032 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1033 timeout: Optional[float] = None, 1034 terminate_other_tasks: Optional[bool] = None, 1035 terminate_other_tasks_on_success: bool = False, 1036 terminate_other_tasks_on_failure: bool = False, 1037 terminate_other_tasks_on_timeout: bool = False, 1038 wait_for_termination: bool = False, 1039 ) -> List[Tuple[CoroID, Any, Exception]]: 1040 i: Interface = current_interface() 1041 return await await_tasks_fastest_successful_explicit( 1042 i, 1043 tasks, 1044 terminate_tree, 1045 graceful_termination_settings, 1046 timeout, 1047 terminate_other_tasks, 1048 terminate_other_tasks_on_success, 1049 terminate_other_tasks_on_failure, 1050 terminate_other_tasks_on_timeout, 1051 wait_for_termination, 1052 )
def
wait_tasks_fastest_successful_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1059def wait_tasks_fastest_successful_explicit( 1060 i: Interface, 1061 tasks: Union[Task, List[Task]], 1062 terminate_tree: bool = False, 1063 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1064 timeout: Optional[float] = None, 1065 terminate_other_tasks: Optional[bool] = None, 1066 terminate_other_tasks_on_success: bool = False, 1067 terminate_other_tasks_on_failure: bool = False, 1068 terminate_other_tasks_on_timeout: bool = False, 1069 wait_for_termination: bool = False, 1070 ) -> List[Tuple[CoroID, Any, Exception]]: 1071 i(RunCoro, await_tasks_explicit, 1072 tasks, 1073 terminate_tree, 1074 graceful_termination_settings, 1075 timeout, 1076 terminate_other_tasks, 1077 terminate_other_tasks_on_success, 1078 terminate_other_tasks_on_failure, 1079 terminate_other_tasks_on_timeout, 1080 wait_for_termination, 1081 )
def
wait_tasks_fastest_successful_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1084def wait_tasks_fastest_successful_implicit( 1085 tasks: Union[Task, List[Task]], 1086 terminate_tree: bool = False, 1087 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1088 timeout: Optional[float] = None, 1089 terminate_other_tasks: Optional[bool] = None, 1090 terminate_other_tasks_on_success: bool = False, 1091 terminate_other_tasks_on_failure: bool = False, 1092 terminate_other_tasks_on_timeout: bool = False, 1093 wait_for_termination: bool = False, 1094 ) -> List[Tuple[CoroID, Any, Exception]]: 1095 i: Interface = current_interface() 1096 return wait_tasks_fastest_successful_explicit( 1097 i, 1098 tasks, 1099 terminate_tree, 1100 graceful_termination_settings, 1101 timeout, 1102 terminate_other_tasks, 1103 terminate_other_tasks_on_success, 1104 terminate_other_tasks_on_failure, 1105 terminate_other_tasks_on_timeout, 1106 wait_for_termination, 1107 )
def
wait_tasks_fastest_successful( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1059def wait_tasks_fastest_successful_explicit( 1060 i: Interface, 1061 tasks: Union[Task, List[Task]], 1062 terminate_tree: bool = False, 1063 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1064 timeout: Optional[float] = None, 1065 terminate_other_tasks: Optional[bool] = None, 1066 terminate_other_tasks_on_success: bool = False, 1067 terminate_other_tasks_on_failure: bool = False, 1068 terminate_other_tasks_on_timeout: bool = False, 1069 wait_for_termination: bool = False, 1070 ) -> List[Tuple[CoroID, Any, Exception]]: 1071 i(RunCoro, await_tasks_explicit, 1072 tasks, 1073 terminate_tree, 1074 graceful_termination_settings, 1075 timeout, 1076 terminate_other_tasks, 1077 terminate_other_tasks_on_success, 1078 terminate_other_tasks_on_failure, 1079 terminate_other_tasks_on_timeout, 1080 wait_for_termination, 1081 )
def
wait_tasks_fastest_successful_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1084def wait_tasks_fastest_successful_implicit( 1085 tasks: Union[Task, List[Task]], 1086 terminate_tree: bool = False, 1087 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1088 timeout: Optional[float] = None, 1089 terminate_other_tasks: Optional[bool] = None, 1090 terminate_other_tasks_on_success: bool = False, 1091 terminate_other_tasks_on_failure: bool = False, 1092 terminate_other_tasks_on_timeout: bool = False, 1093 wait_for_termination: bool = False, 1094 ) -> List[Tuple[CoroID, Any, Exception]]: 1095 i: Interface = current_interface() 1096 return wait_tasks_fastest_successful_explicit( 1097 i, 1098 tasks, 1099 terminate_tree, 1100 graceful_termination_settings, 1101 timeout, 1102 terminate_other_tasks, 1103 terminate_other_tasks_on_success, 1104 terminate_other_tasks_on_failure, 1105 terminate_other_tasks_on_timeout, 1106 wait_for_termination, 1107 )
async def
await_tasks_fastest_exception_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> Tuple[int, Exception]:
1117async def await_tasks_fastest_exception_explicit( 1118 i: Interface, 1119 tasks: Union[Task, List[Task]], 1120 terminate_tree: bool = False, 1121 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1122 timeout: Optional[float] = None, 1123 terminate_other_tasks: Optional[bool] = None, 1124 terminate_other_tasks_on_success: bool = False, 1125 terminate_other_tasks_on_failure: bool = False, 1126 terminate_other_tasks_on_timeout: bool = False, 1127 wait_for_termination: bool = False, 1128 ) -> Tuple[CoroID, Exception]: 1129 if isinstance(tasks, Task): 1130 tasks = [tasks] 1131 1132 if not tasks: 1133 raise TasksListIsEmptyError 1134 1135 fastest_exception_wait_id: str = str(f'wait_task__fastest_exception__{uuid4()}') 1136 fastest_exception_info: FastestExceptionInfo = FastestExceptionInfo(fastest_exception_wait_id, tasks) 1137 1138 if timeout is not None: 1139 # TODO: implement discard timer request when needed in order to improve performance 1140 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_exception_info.on_timeout) 1141 1142 for another_task in tasks: 1143 another_task.add_on_done_handler(FastestExceptionOnDoneHandler(fastest_exception_info)) 1144 1145 await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_exception_wait_id)) 1146 results: List[Tuple[CoroID, Any, Exception]] = fastest_exception_info.gather() 1147 if fastest_exception_info.timeout: 1148 termination_reason: TerminationReason = TerminationReason.timeout 1149 elif fastest_exception_info.failure: 1150 termination_reason = TerminationReason.failure 1151 else: 1152 termination_reason = TerminationReason.success 1153 1154 await aterminate_tasks_explicit( 1155 i, 1156 tasks, 1157 termination_reason, 1158 terminate_tree, 1159 graceful_termination_settings, 1160 terminate_other_tasks, 1161 terminate_other_tasks_on_success, 1162 terminate_other_tasks_on_failure, 1163 terminate_other_tasks_on_timeout, 1164 wait_for_termination, 1165 ) 1166 if not results: 1167 return None, None 1168 1169 coro_id, result, exception = results[0] 1170 if exception is not None: 1171 if isinstance(exception, TimeoutError) and (coro_id is not None): 1172 exception = SubTimeoutError() 1173 1174 return coro_id, exception
async def
await_tasks_fastest_exception_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1177async def await_tasks_fastest_exception_implicit( 1178 tasks: Union[Task, List[Task]], 1179 terminate_tree: bool = False, 1180 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1181 timeout: Optional[float] = None, 1182 terminate_other_tasks: Optional[bool] = None, 1183 terminate_other_tasks_on_success: bool = False, 1184 terminate_other_tasks_on_failure: bool = False, 1185 terminate_other_tasks_on_timeout: bool = False, 1186 wait_for_termination: bool = False, 1187 ) -> List[Tuple[CoroID, Any, Exception]]: 1188 i: Interface = current_interface() 1189 return await await_tasks_explicit( 1190 i, 1191 tasks, 1192 terminate_tree, 1193 graceful_termination_settings, 1194 timeout, 1195 terminate_other_tasks, 1196 terminate_other_tasks_on_success, 1197 terminate_other_tasks_on_failure, 1198 terminate_other_tasks_on_timeout, 1199 wait_for_termination, 1200 )
async def
await_tasks_fastest_exception( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> Tuple[int, Exception]:
1117async def await_tasks_fastest_exception_explicit( 1118 i: Interface, 1119 tasks: Union[Task, List[Task]], 1120 terminate_tree: bool = False, 1121 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1122 timeout: Optional[float] = None, 1123 terminate_other_tasks: Optional[bool] = None, 1124 terminate_other_tasks_on_success: bool = False, 1125 terminate_other_tasks_on_failure: bool = False, 1126 terminate_other_tasks_on_timeout: bool = False, 1127 wait_for_termination: bool = False, 1128 ) -> Tuple[CoroID, Exception]: 1129 if isinstance(tasks, Task): 1130 tasks = [tasks] 1131 1132 if not tasks: 1133 raise TasksListIsEmptyError 1134 1135 fastest_exception_wait_id: str = str(f'wait_task__fastest_exception__{uuid4()}') 1136 fastest_exception_info: FastestExceptionInfo = FastestExceptionInfo(fastest_exception_wait_id, tasks) 1137 1138 if timeout is not None: 1139 # TODO: implement discard timer request when needed in order to improve performance 1140 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_exception_info.on_timeout) 1141 1142 for another_task in tasks: 1143 another_task.add_on_done_handler(FastestExceptionOnDoneHandler(fastest_exception_info)) 1144 1145 await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_exception_wait_id)) 1146 results: List[Tuple[CoroID, Any, Exception]] = fastest_exception_info.gather() 1147 if fastest_exception_info.timeout: 1148 termination_reason: TerminationReason = TerminationReason.timeout 1149 elif fastest_exception_info.failure: 1150 termination_reason = TerminationReason.failure 1151 else: 1152 termination_reason = TerminationReason.success 1153 1154 await aterminate_tasks_explicit( 1155 i, 1156 tasks, 1157 termination_reason, 1158 terminate_tree, 1159 graceful_termination_settings, 1160 terminate_other_tasks, 1161 terminate_other_tasks_on_success, 1162 terminate_other_tasks_on_failure, 1163 terminate_other_tasks_on_timeout, 1164 wait_for_termination, 1165 ) 1166 if not results: 1167 return None, None 1168 1169 coro_id, result, exception = results[0] 1170 if exception is not None: 1171 if isinstance(exception, TimeoutError) and (coro_id is not None): 1172 exception = SubTimeoutError() 1173 1174 return coro_id, exception
async def
await_tasks_fastest_exception_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1177async def await_tasks_fastest_exception_implicit( 1178 tasks: Union[Task, List[Task]], 1179 terminate_tree: bool = False, 1180 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1181 timeout: Optional[float] = None, 1182 terminate_other_tasks: Optional[bool] = None, 1183 terminate_other_tasks_on_success: bool = False, 1184 terminate_other_tasks_on_failure: bool = False, 1185 terminate_other_tasks_on_timeout: bool = False, 1186 wait_for_termination: bool = False, 1187 ) -> List[Tuple[CoroID, Any, Exception]]: 1188 i: Interface = current_interface() 1189 return await await_tasks_explicit( 1190 i, 1191 tasks, 1192 terminate_tree, 1193 graceful_termination_settings, 1194 timeout, 1195 terminate_other_tasks, 1196 terminate_other_tasks_on_success, 1197 terminate_other_tasks_on_failure, 1198 terminate_other_tasks_on_timeout, 1199 wait_for_termination, 1200 )
def
wait_tasks_fastest_exception_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1207def wait_tasks_fastest_exception_explicit( 1208 i: Interface, 1209 tasks: Union[Task, List[Task]], 1210 terminate_tree: bool = False, 1211 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1212 timeout: Optional[float] = None, 1213 terminate_other_tasks: Optional[bool] = None, 1214 terminate_other_tasks_on_success: bool = False, 1215 terminate_other_tasks_on_failure: bool = False, 1216 terminate_other_tasks_on_timeout: bool = False, 1217 wait_for_termination: bool = False, 1218 ) -> List[Tuple[CoroID, Any, Exception]]: 1219 i(RunCoro, await_tasks_fastest_exception_explicit, 1220 tasks, 1221 terminate_tree, 1222 graceful_termination_settings, 1223 timeout, 1224 terminate_other_tasks, 1225 terminate_other_tasks_on_success, 1226 terminate_other_tasks_on_failure, 1227 terminate_other_tasks_on_timeout, 1228 wait_for_termination, 1229 )
def
wait_tasks_fastest_exception_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1232def wait_tasks_fastest_exception_implicit( 1233 tasks: Union[Task, List[Task]], 1234 terminate_tree: bool = False, 1235 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1236 timeout: Optional[float] = None, 1237 terminate_other_tasks: Optional[bool] = None, 1238 terminate_other_tasks_on_success: bool = False, 1239 terminate_other_tasks_on_failure: bool = False, 1240 terminate_other_tasks_on_timeout: bool = False, 1241 wait_for_termination: bool = False, 1242 ) -> List[Tuple[CoroID, Any, Exception]]: 1243 i: Interface = current_interface() 1244 return wait_tasks_fastest_exception_explicit( 1245 i, 1246 tasks, 1247 terminate_tree, 1248 graceful_termination_settings, 1249 timeout, 1250 terminate_other_tasks, 1251 terminate_other_tasks_on_success, 1252 terminate_other_tasks_on_failure, 1253 terminate_other_tasks_on_timeout, 1254 wait_for_termination, 1255 )
def
wait_tasks_fastest_exception( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1207def wait_tasks_fastest_exception_explicit( 1208 i: Interface, 1209 tasks: Union[Task, List[Task]], 1210 terminate_tree: bool = False, 1211 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1212 timeout: Optional[float] = None, 1213 terminate_other_tasks: Optional[bool] = None, 1214 terminate_other_tasks_on_success: bool = False, 1215 terminate_other_tasks_on_failure: bool = False, 1216 terminate_other_tasks_on_timeout: bool = False, 1217 wait_for_termination: bool = False, 1218 ) -> List[Tuple[CoroID, Any, Exception]]: 1219 i(RunCoro, await_tasks_fastest_exception_explicit, 1220 tasks, 1221 terminate_tree, 1222 graceful_termination_settings, 1223 timeout, 1224 terminate_other_tasks, 1225 terminate_other_tasks_on_success, 1226 terminate_other_tasks_on_failure, 1227 terminate_other_tasks_on_timeout, 1228 wait_for_termination, 1229 )
def
wait_tasks_fastest_exception_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1232def wait_tasks_fastest_exception_implicit( 1233 tasks: Union[Task, List[Task]], 1234 terminate_tree: bool = False, 1235 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1236 timeout: Optional[float] = None, 1237 terminate_other_tasks: Optional[bool] = None, 1238 terminate_other_tasks_on_success: bool = False, 1239 terminate_other_tasks_on_failure: bool = False, 1240 terminate_other_tasks_on_timeout: bool = False, 1241 wait_for_termination: bool = False, 1242 ) -> List[Tuple[CoroID, Any, Exception]]: 1243 i: Interface = current_interface() 1244 return wait_tasks_fastest_exception_explicit( 1245 i, 1246 tasks, 1247 terminate_tree, 1248 graceful_termination_settings, 1249 timeout, 1250 terminate_other_tasks, 1251 terminate_other_tasks_on_success, 1252 terminate_other_tasks_on_failure, 1253 terminate_other_tasks_on_timeout, 1254 wait_for_termination, 1255 )
async def
await_tasks_fastest_custom_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> Tuple[int, Any]:
1265async def await_tasks_fastest_custom_explicit( 1266 i: Interface, 1267 tasks: Union[Task, List[Task]], 1268 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1269 terminate_tree: bool = False, 1270 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1271 timeout: Optional[float] = None, 1272 terminate_other_tasks: Optional[bool] = None, 1273 terminate_other_tasks_on_success: bool = False, 1274 terminate_other_tasks_on_failure: bool = False, 1275 terminate_other_tasks_on_timeout: bool = False, 1276 wait_for_termination: bool = False, 1277 ) -> Tuple[CoroID, Any]: 1278 if isinstance(tasks, Task): 1279 tasks = [tasks] 1280 1281 if not tasks: 1282 raise TasksListIsEmptyError 1283 1284 fastest_custom_wait_id: str = str(f'wait_task__fastest_custom__{uuid4()}') 1285 fastest_custom_info: FastestCustomInfo = FastestCustomInfo(fastest_custom_wait_id, tasks, result_criteria_handler) 1286 1287 if timeout is not None: 1288 # TODO: implement discard timer request when needed in order to improve performance 1289 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_custom_info.on_timeout) 1290 1291 for another_task in tasks: 1292 another_task.add_on_done_handler(FastestCustomOnDoneHandler(fastest_custom_info)) 1293 1294 await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_custom_wait_id)) 1295 results: List[Tuple[CoroID, Any, Exception]] = fastest_custom_info.gather() 1296 if fastest_custom_info.timeout: 1297 termination_reason: TerminationReason = TerminationReason.timeout 1298 elif fastest_custom_info.failure: 1299 termination_reason = TerminationReason.failure 1300 else: 1301 termination_reason = TerminationReason.success 1302 1303 await aterminate_tasks_explicit( 1304 i, 1305 tasks, 1306 termination_reason, 1307 terminate_tree, 1308 graceful_termination_settings, 1309 terminate_other_tasks, 1310 terminate_other_tasks_on_success, 1311 terminate_other_tasks_on_failure, 1312 terminate_other_tasks_on_timeout, 1313 wait_for_termination, 1314 ) 1315 if not results: 1316 return None, None 1317 1318 coro_id, result, exception = results[0] 1319 if exception is not None: 1320 if isinstance(exception, TimeoutError) and (coro_id is not None): 1321 exception = SubTimeoutError() 1322 1323 raise exception 1324 1325 return coro_id, result
async def
await_tasks_fastest_custom_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1328async def await_tasks_fastest_custom_implicit( 1329 tasks: Union[Task, List[Task]], 1330 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1331 terminate_tree: bool = False, 1332 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1333 timeout: Optional[float] = None, 1334 terminate_other_tasks: Optional[bool] = None, 1335 terminate_other_tasks_on_success: bool = False, 1336 terminate_other_tasks_on_failure: bool = False, 1337 terminate_other_tasks_on_timeout: bool = False, 1338 wait_for_termination: bool = False, 1339 ) -> List[Tuple[CoroID, Any, Exception]]: 1340 i: Interface = current_interface() 1341 return await await_tasks_fastest_custom_explicit( 1342 i, 1343 tasks, 1344 result_criteria_handler, 1345 terminate_tree, 1346 graceful_termination_settings, 1347 timeout, 1348 terminate_other_tasks, 1349 terminate_other_tasks_on_success, 1350 terminate_other_tasks_on_failure, 1351 terminate_other_tasks_on_timeout, 1352 wait_for_termination, 1353 )
async def
await_tasks_fastest_custom( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> Tuple[int, Any]:
1265async def await_tasks_fastest_custom_explicit( 1266 i: Interface, 1267 tasks: Union[Task, List[Task]], 1268 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1269 terminate_tree: bool = False, 1270 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1271 timeout: Optional[float] = None, 1272 terminate_other_tasks: Optional[bool] = None, 1273 terminate_other_tasks_on_success: bool = False, 1274 terminate_other_tasks_on_failure: bool = False, 1275 terminate_other_tasks_on_timeout: bool = False, 1276 wait_for_termination: bool = False, 1277 ) -> Tuple[CoroID, Any]: 1278 if isinstance(tasks, Task): 1279 tasks = [tasks] 1280 1281 if not tasks: 1282 raise TasksListIsEmptyError 1283 1284 fastest_custom_wait_id: str = str(f'wait_task__fastest_custom__{uuid4()}') 1285 fastest_custom_info: FastestCustomInfo = FastestCustomInfo(fastest_custom_wait_id, tasks, result_criteria_handler) 1286 1287 if timeout is not None: 1288 # TODO: implement discard timer request when needed in order to improve performance 1289 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_custom_info.on_timeout) 1290 1291 for another_task in tasks: 1292 another_task.add_on_done_handler(FastestCustomOnDoneHandler(fastest_custom_info)) 1293 1294 await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_custom_wait_id)) 1295 results: List[Tuple[CoroID, Any, Exception]] = fastest_custom_info.gather() 1296 if fastest_custom_info.timeout: 1297 termination_reason: TerminationReason = TerminationReason.timeout 1298 elif fastest_custom_info.failure: 1299 termination_reason = TerminationReason.failure 1300 else: 1301 termination_reason = TerminationReason.success 1302 1303 await aterminate_tasks_explicit( 1304 i, 1305 tasks, 1306 termination_reason, 1307 terminate_tree, 1308 graceful_termination_settings, 1309 terminate_other_tasks, 1310 terminate_other_tasks_on_success, 1311 terminate_other_tasks_on_failure, 1312 terminate_other_tasks_on_timeout, 1313 wait_for_termination, 1314 ) 1315 if not results: 1316 return None, None 1317 1318 coro_id, result, exception = results[0] 1319 if exception is not None: 1320 if isinstance(exception, TimeoutError) and (coro_id is not None): 1321 exception = SubTimeoutError() 1322 1323 raise exception 1324 1325 return coro_id, result
async def
await_tasks_fastest_custom_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1328async def await_tasks_fastest_custom_implicit( 1329 tasks: Union[Task, List[Task]], 1330 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1331 terminate_tree: bool = False, 1332 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1333 timeout: Optional[float] = None, 1334 terminate_other_tasks: Optional[bool] = None, 1335 terminate_other_tasks_on_success: bool = False, 1336 terminate_other_tasks_on_failure: bool = False, 1337 terminate_other_tasks_on_timeout: bool = False, 1338 wait_for_termination: bool = False, 1339 ) -> List[Tuple[CoroID, Any, Exception]]: 1340 i: Interface = current_interface() 1341 return await await_tasks_fastest_custom_explicit( 1342 i, 1343 tasks, 1344 result_criteria_handler, 1345 terminate_tree, 1346 graceful_termination_settings, 1347 timeout, 1348 terminate_other_tasks, 1349 terminate_other_tasks_on_success, 1350 terminate_other_tasks_on_failure, 1351 terminate_other_tasks_on_timeout, 1352 wait_for_termination, 1353 )
def
wait_tasks_fastest_custom_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1360def wait_tasks_fastest_custom_explicit( 1361 i: Interface, 1362 tasks: Union[Task, List[Task]], 1363 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1364 terminate_tree: bool = False, 1365 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1366 timeout: Optional[float] = None, 1367 terminate_other_tasks: Optional[bool] = None, 1368 terminate_other_tasks_on_success: bool = False, 1369 terminate_other_tasks_on_failure: bool = False, 1370 terminate_other_tasks_on_timeout: bool = False, 1371 wait_for_termination: bool = False, 1372 ) -> List[Tuple[CoroID, Any, Exception]]: 1373 i(RunCoro, await_tasks_fastest_custom_explicit, 1374 tasks, 1375 result_criteria_handler, 1376 terminate_tree, 1377 graceful_termination_settings, 1378 timeout, 1379 terminate_other_tasks, 1380 terminate_other_tasks_on_success, 1381 terminate_other_tasks_on_failure, 1382 terminate_other_tasks_on_timeout, 1383 wait_for_termination, 1384 )
def
wait_tasks_fastest_custom_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1387def wait_tasks_fastest_custom_implicit( 1388 tasks: Union[Task, List[Task]], 1389 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1390 terminate_tree: bool = False, 1391 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1392 timeout: Optional[float] = None, 1393 terminate_other_tasks: Optional[bool] = None, 1394 terminate_other_tasks_on_success: bool = False, 1395 terminate_other_tasks_on_failure: bool = False, 1396 terminate_other_tasks_on_timeout: bool = False, 1397 wait_for_termination: bool = False, 1398 ) -> List[Tuple[CoroID, Any, Exception]]: 1399 i: Interface = current_interface() 1400 return wait_tasks_fastest_custom_explicit( 1401 i, 1402 tasks, 1403 result_criteria_handler, 1404 terminate_tree, 1405 graceful_termination_settings, 1406 timeout, 1407 terminate_other_tasks, 1408 terminate_other_tasks_on_success, 1409 terminate_other_tasks_on_failure, 1410 terminate_other_tasks_on_timeout, 1411 wait_for_termination, 1412 )
def
wait_tasks_fastest_custom( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1360def wait_tasks_fastest_custom_explicit( 1361 i: Interface, 1362 tasks: Union[Task, List[Task]], 1363 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1364 terminate_tree: bool = False, 1365 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1366 timeout: Optional[float] = None, 1367 terminate_other_tasks: Optional[bool] = None, 1368 terminate_other_tasks_on_success: bool = False, 1369 terminate_other_tasks_on_failure: bool = False, 1370 terminate_other_tasks_on_timeout: bool = False, 1371 wait_for_termination: bool = False, 1372 ) -> List[Tuple[CoroID, Any, Exception]]: 1373 i(RunCoro, await_tasks_fastest_custom_explicit, 1374 tasks, 1375 result_criteria_handler, 1376 terminate_tree, 1377 graceful_termination_settings, 1378 timeout, 1379 terminate_other_tasks, 1380 terminate_other_tasks_on_success, 1381 terminate_other_tasks_on_failure, 1382 terminate_other_tasks_on_timeout, 1383 wait_for_termination, 1384 )
def
wait_tasks_fastest_custom_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1387def wait_tasks_fastest_custom_implicit( 1388 tasks: Union[Task, List[Task]], 1389 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1390 terminate_tree: bool = False, 1391 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1392 timeout: Optional[float] = None, 1393 terminate_other_tasks: Optional[bool] = None, 1394 terminate_other_tasks_on_success: bool = False, 1395 terminate_other_tasks_on_failure: bool = False, 1396 terminate_other_tasks_on_timeout: bool = False, 1397 wait_for_termination: bool = False, 1398 ) -> List[Tuple[CoroID, Any, Exception]]: 1399 i: Interface = current_interface() 1400 return wait_tasks_fastest_custom_explicit( 1401 i, 1402 tasks, 1403 result_criteria_handler, 1404 terminate_tree, 1405 graceful_termination_settings, 1406 timeout, 1407 terminate_other_tasks, 1408 terminate_other_tasks_on_success, 1409 terminate_other_tasks_on_failure, 1410 terminate_other_tasks_on_timeout, 1411 wait_for_termination, 1412 )
async def
await_tasks_atomic_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False):
1422async def await_tasks_atomic_explicit( 1423 i: Interface, 1424 tasks: Union[Task, List[Task]], 1425 terminate_tree: bool = False, 1426 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1427 timeout: Optional[float] = None, 1428 terminate_other_tasks: Optional[bool] = None, 1429 terminate_other_tasks_on_success: bool = False, 1430 terminate_other_tasks_on_failure: bool = False, 1431 terminate_other_tasks_on_timeout: bool = False, 1432 wait_for_termination: bool = False, 1433 ): 1434 if terminate_other_tasks is not None: 1435 terminate_other_tasks_on_success = terminate_other_tasks 1436 terminate_other_tasks_on_failure = terminate_other_tasks 1437 terminate_other_tasks_on_timeout = terminate_other_tasks 1438 1439 if isinstance(tasks, Task): 1440 tasks = [tasks] 1441 1442 if not tasks: 1443 raise TasksListIsEmptyError 1444 1445 atomic_wait_id: str = str(f'wait_task__atomic__{uuid4()}') 1446 atomic_info: AtomicInfo = AtomicInfo(atomic_wait_id, tasks) 1447 1448 if timeout is not None: 1449 # TODO: implement discard timer request when needed in order to improve performance 1450 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, atomic_info.on_timeout) 1451 1452 for another_task in tasks: 1453 another_task.add_on_done_handler(AtomicOnDoneHandler(atomic_info)) 1454 1455 await i(AsyncEventBus, AsyncEventBusRequest().wait(atomic_wait_id)) 1456 results: List[Tuple[CoroID, Any, Exception]] = atomic_info.gather() 1457 if atomic_info.timeout: 1458 termination_reason: TerminationReason = TerminationReason.timeout 1459 elif atomic_info.failure: 1460 termination_reason = TerminationReason.failure 1461 else: 1462 termination_reason = TerminationReason.success 1463 1464 await aterminate_tasks_explicit( 1465 i, 1466 tasks, 1467 termination_reason, 1468 terminate_tree, 1469 graceful_termination_settings, 1470 terminate_other_tasks, 1471 terminate_other_tasks_on_success, 1472 terminate_other_tasks_on_failure, 1473 terminate_other_tasks_on_timeout, 1474 wait_for_termination, 1475 ) 1476 return results
async def
await_tasks_atomic_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1479async def await_tasks_atomic_implicit( 1480 tasks: Union[Task, List[Task]], 1481 terminate_tree: bool = False, 1482 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1483 timeout: Optional[float] = None, 1484 terminate_other_tasks: Optional[bool] = None, 1485 terminate_other_tasks_on_success: bool = False, 1486 terminate_other_tasks_on_failure: bool = False, 1487 terminate_other_tasks_on_timeout: bool = False, 1488 wait_for_termination: bool = False, 1489 ) -> List[Tuple[CoroID, Any, Exception]]: 1490 i: Interface = current_interface() 1491 return await await_tasks_atomic_explicit( 1492 i, 1493 tasks, 1494 terminate_tree, 1495 graceful_termination_settings, 1496 timeout, 1497 terminate_other_tasks, 1498 terminate_other_tasks_on_success, 1499 terminate_other_tasks_on_failure, 1500 terminate_other_tasks_on_timeout, 1501 wait_for_termination, 1502 )
async def
await_tasks_atomic( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False):
1422async def await_tasks_atomic_explicit( 1423 i: Interface, 1424 tasks: Union[Task, List[Task]], 1425 terminate_tree: bool = False, 1426 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1427 timeout: Optional[float] = None, 1428 terminate_other_tasks: Optional[bool] = None, 1429 terminate_other_tasks_on_success: bool = False, 1430 terminate_other_tasks_on_failure: bool = False, 1431 terminate_other_tasks_on_timeout: bool = False, 1432 wait_for_termination: bool = False, 1433 ): 1434 if terminate_other_tasks is not None: 1435 terminate_other_tasks_on_success = terminate_other_tasks 1436 terminate_other_tasks_on_failure = terminate_other_tasks 1437 terminate_other_tasks_on_timeout = terminate_other_tasks 1438 1439 if isinstance(tasks, Task): 1440 tasks = [tasks] 1441 1442 if not tasks: 1443 raise TasksListIsEmptyError 1444 1445 atomic_wait_id: str = str(f'wait_task__atomic__{uuid4()}') 1446 atomic_info: AtomicInfo = AtomicInfo(atomic_wait_id, tasks) 1447 1448 if timeout is not None: 1449 # TODO: implement discard timer request when needed in order to improve performance 1450 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, atomic_info.on_timeout) 1451 1452 for another_task in tasks: 1453 another_task.add_on_done_handler(AtomicOnDoneHandler(atomic_info)) 1454 1455 await i(AsyncEventBus, AsyncEventBusRequest().wait(atomic_wait_id)) 1456 results: List[Tuple[CoroID, Any, Exception]] = atomic_info.gather() 1457 if atomic_info.timeout: 1458 termination_reason: TerminationReason = TerminationReason.timeout 1459 elif atomic_info.failure: 1460 termination_reason = TerminationReason.failure 1461 else: 1462 termination_reason = TerminationReason.success 1463 1464 await aterminate_tasks_explicit( 1465 i, 1466 tasks, 1467 termination_reason, 1468 terminate_tree, 1469 graceful_termination_settings, 1470 terminate_other_tasks, 1471 terminate_other_tasks_on_success, 1472 terminate_other_tasks_on_failure, 1473 terminate_other_tasks_on_timeout, 1474 wait_for_termination, 1475 ) 1476 return results
async def
await_tasks_atomic_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1479async def await_tasks_atomic_implicit( 1480 tasks: Union[Task, List[Task]], 1481 terminate_tree: bool = False, 1482 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1483 timeout: Optional[float] = None, 1484 terminate_other_tasks: Optional[bool] = None, 1485 terminate_other_tasks_on_success: bool = False, 1486 terminate_other_tasks_on_failure: bool = False, 1487 terminate_other_tasks_on_timeout: bool = False, 1488 wait_for_termination: bool = False, 1489 ) -> List[Tuple[CoroID, Any, Exception]]: 1490 i: Interface = current_interface() 1491 return await await_tasks_atomic_explicit( 1492 i, 1493 tasks, 1494 terminate_tree, 1495 graceful_termination_settings, 1496 timeout, 1497 terminate_other_tasks, 1498 terminate_other_tasks_on_success, 1499 terminate_other_tasks_on_failure, 1500 terminate_other_tasks_on_timeout, 1501 wait_for_termination, 1502 )
def
wait_tasks_atomic_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1509def wait_tasks_atomic_explicit( 1510 i: Interface, 1511 tasks: Union[Task, List[Task]], 1512 terminate_tree: bool = False, 1513 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1514 timeout: Optional[float] = None, 1515 terminate_other_tasks: Optional[bool] = None, 1516 terminate_other_tasks_on_success: bool = False, 1517 terminate_other_tasks_on_failure: bool = False, 1518 terminate_other_tasks_on_timeout: bool = False, 1519 wait_for_termination: bool = False, 1520 ) -> List[Tuple[CoroID, Any, Exception]]: 1521 i(RunCoro, await_tasks_atomic_explicit, 1522 tasks, 1523 terminate_tree, 1524 graceful_termination_settings, 1525 timeout, 1526 terminate_other_tasks, 1527 terminate_other_tasks_on_success, 1528 terminate_other_tasks_on_failure, 1529 terminate_other_tasks_on_timeout, 1530 wait_for_termination, 1531 )
def
wait_tasks_atomic_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1534def wait_tasks_atomic_implicit( 1535 tasks: Union[Task, List[Task]], 1536 terminate_tree: bool = False, 1537 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1538 timeout: Optional[float] = None, 1539 terminate_other_tasks: Optional[bool] = None, 1540 terminate_other_tasks_on_success: bool = False, 1541 terminate_other_tasks_on_failure: bool = False, 1542 terminate_other_tasks_on_timeout: bool = False, 1543 wait_for_termination: bool = False, 1544 ) -> List[Tuple[CoroID, Any, Exception]]: 1545 i: Interface = current_interface() 1546 return wait_tasks_atomic_explicit( 1547 i, 1548 tasks, 1549 terminate_tree, 1550 graceful_termination_settings, 1551 timeout, 1552 terminate_other_tasks, 1553 terminate_other_tasks_on_success, 1554 terminate_other_tasks_on_failure, 1555 terminate_other_tasks_on_timeout, 1556 wait_for_termination, 1557 )
def
wait_tasks_atomic( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1509def wait_tasks_atomic_explicit( 1510 i: Interface, 1511 tasks: Union[Task, List[Task]], 1512 terminate_tree: bool = False, 1513 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1514 timeout: Optional[float] = None, 1515 terminate_other_tasks: Optional[bool] = None, 1516 terminate_other_tasks_on_success: bool = False, 1517 terminate_other_tasks_on_failure: bool = False, 1518 terminate_other_tasks_on_timeout: bool = False, 1519 wait_for_termination: bool = False, 1520 ) -> List[Tuple[CoroID, Any, Exception]]: 1521 i(RunCoro, await_tasks_atomic_explicit, 1522 tasks, 1523 terminate_tree, 1524 graceful_termination_settings, 1525 timeout, 1526 terminate_other_tasks, 1527 terminate_other_tasks_on_success, 1528 terminate_other_tasks_on_failure, 1529 terminate_other_tasks_on_timeout, 1530 wait_for_termination, 1531 )
def
wait_tasks_atomic_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1534def wait_tasks_atomic_implicit( 1535 tasks: Union[Task, List[Task]], 1536 terminate_tree: bool = False, 1537 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1538 timeout: Optional[float] = None, 1539 terminate_other_tasks: Optional[bool] = None, 1540 terminate_other_tasks_on_success: bool = False, 1541 terminate_other_tasks_on_failure: bool = False, 1542 terminate_other_tasks_on_timeout: bool = False, 1543 wait_for_termination: bool = False, 1544 ) -> List[Tuple[CoroID, Any, Exception]]: 1545 i: Interface = current_interface() 1546 return wait_tasks_atomic_explicit( 1547 i, 1548 tasks, 1549 terminate_tree, 1550 graceful_termination_settings, 1551 timeout, 1552 terminate_other_tasks, 1553 terminate_other_tasks_on_success, 1554 terminate_other_tasks_on_failure, 1555 terminate_other_tasks_on_timeout, 1556 wait_for_termination, 1557 )
async def
await_tasks_atomic_custom_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], first_failed_result_formatter: typing.Union[typing.Callable[[int, typing.Any, Exception], typing.Any], NoneType] = None, terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False):
1567async def await_tasks_atomic_custom_explicit( 1568 i: Interface, 1569 tasks: Union[Task, List[Task]], 1570 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1571 first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 1572 terminate_tree: bool = False, 1573 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1574 timeout: Optional[float] = None, 1575 terminate_other_tasks: Optional[bool] = None, 1576 terminate_other_tasks_on_success: bool = False, 1577 terminate_other_tasks_on_failure: bool = False, 1578 terminate_other_tasks_on_timeout: bool = False, 1579 wait_for_termination: bool = False, 1580 ): 1581 if terminate_other_tasks is not None: 1582 terminate_other_tasks_on_success = terminate_other_tasks 1583 terminate_other_tasks_on_failure = terminate_other_tasks 1584 terminate_other_tasks_on_timeout = terminate_other_tasks 1585 1586 if isinstance(tasks, Task): 1587 tasks = [tasks] 1588 1589 if not tasks: 1590 raise TasksListIsEmptyError 1591 1592 atomic_custom_wait_id: str = str(f'wait_task__atomic_custom__{uuid4()}') 1593 atomic_custom_info: AtomicCustomInfo = AtomicCustomInfo( 1594 atomic_custom_wait_id, 1595 tasks, 1596 result_criteria_handler, 1597 first_failed_result_formatter, 1598 ) 1599 1600 if timeout is not None: 1601 # TODO: implement discard timer request when needed in order to improve performance 1602 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, atomic_custom_info.on_timeout) 1603 1604 for another_task in tasks: 1605 another_task.add_on_done_handler(AtomicCustomOnDoneHandler(atomic_custom_info)) 1606 1607 await i(AsyncEventBus, AsyncEventBusRequest().wait(atomic_custom_wait_id)) 1608 results: List[Tuple[CoroID, Any, Exception]] = atomic_custom_info.gather() 1609 if atomic_custom_info.timeout: 1610 termination_reason: TerminationReason = TerminationReason.timeout 1611 elif atomic_custom_info.failure: 1612 termination_reason = TerminationReason.failure 1613 else: 1614 termination_reason = TerminationReason.success 1615 1616 await aterminate_tasks_explicit( 1617 i, 1618 tasks, 1619 termination_reason, 1620 terminate_tree, 1621 graceful_termination_settings, 1622 terminate_other_tasks, 1623 terminate_other_tasks_on_success, 1624 terminate_other_tasks_on_failure, 1625 terminate_other_tasks_on_timeout, 1626 wait_for_termination, 1627 ) 1628 return results
async def
await_tasks_atomic_custom_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], first_failed_result_formatter: typing.Union[typing.Callable[[int, typing.Any, Exception], typing.Any], NoneType] = None, terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1631async def await_tasks_atomic_custom_implicit( 1632 tasks: Union[Task, List[Task]], 1633 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1634 first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 1635 terminate_tree: bool = False, 1636 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1637 timeout: Optional[float] = None, 1638 terminate_other_tasks: Optional[bool] = None, 1639 terminate_other_tasks_on_success: bool = False, 1640 terminate_other_tasks_on_failure: bool = False, 1641 terminate_other_tasks_on_timeout: bool = False, 1642 wait_for_termination: bool = False, 1643 ) -> List[Tuple[CoroID, Any, Exception]]: 1644 i: Interface = current_interface() 1645 return await await_tasks_atomic_custom_explicit( 1646 i, 1647 tasks, 1648 result_criteria_handler, 1649 first_failed_result_formatter, 1650 terminate_tree, 1651 graceful_termination_settings, 1652 timeout, 1653 terminate_other_tasks, 1654 terminate_other_tasks_on_success, 1655 terminate_other_tasks_on_failure, 1656 terminate_other_tasks_on_timeout, 1657 wait_for_termination, 1658 )
async def
await_tasks_atomic_custom( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], first_failed_result_formatter: typing.Union[typing.Callable[[int, typing.Any, Exception], typing.Any], NoneType] = None, terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False):
1567async def await_tasks_atomic_custom_explicit( 1568 i: Interface, 1569 tasks: Union[Task, List[Task]], 1570 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1571 first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 1572 terminate_tree: bool = False, 1573 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1574 timeout: Optional[float] = None, 1575 terminate_other_tasks: Optional[bool] = None, 1576 terminate_other_tasks_on_success: bool = False, 1577 terminate_other_tasks_on_failure: bool = False, 1578 terminate_other_tasks_on_timeout: bool = False, 1579 wait_for_termination: bool = False, 1580 ): 1581 if terminate_other_tasks is not None: 1582 terminate_other_tasks_on_success = terminate_other_tasks 1583 terminate_other_tasks_on_failure = terminate_other_tasks 1584 terminate_other_tasks_on_timeout = terminate_other_tasks 1585 1586 if isinstance(tasks, Task): 1587 tasks = [tasks] 1588 1589 if not tasks: 1590 raise TasksListIsEmptyError 1591 1592 atomic_custom_wait_id: str = str(f'wait_task__atomic_custom__{uuid4()}') 1593 atomic_custom_info: AtomicCustomInfo = AtomicCustomInfo( 1594 atomic_custom_wait_id, 1595 tasks, 1596 result_criteria_handler, 1597 first_failed_result_formatter, 1598 ) 1599 1600 if timeout is not None: 1601 # TODO: implement discard timer request when needed in order to improve performance 1602 timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, atomic_custom_info.on_timeout) 1603 1604 for another_task in tasks: 1605 another_task.add_on_done_handler(AtomicCustomOnDoneHandler(atomic_custom_info)) 1606 1607 await i(AsyncEventBus, AsyncEventBusRequest().wait(atomic_custom_wait_id)) 1608 results: List[Tuple[CoroID, Any, Exception]] = atomic_custom_info.gather() 1609 if atomic_custom_info.timeout: 1610 termination_reason: TerminationReason = TerminationReason.timeout 1611 elif atomic_custom_info.failure: 1612 termination_reason = TerminationReason.failure 1613 else: 1614 termination_reason = TerminationReason.success 1615 1616 await aterminate_tasks_explicit( 1617 i, 1618 tasks, 1619 termination_reason, 1620 terminate_tree, 1621 graceful_termination_settings, 1622 terminate_other_tasks, 1623 terminate_other_tasks_on_success, 1624 terminate_other_tasks_on_failure, 1625 terminate_other_tasks_on_timeout, 1626 wait_for_termination, 1627 ) 1628 return results
async def
await_tasks_atomic_custom_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], first_failed_result_formatter: typing.Union[typing.Callable[[int, typing.Any, Exception], typing.Any], NoneType] = None, terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1631async def await_tasks_atomic_custom_implicit( 1632 tasks: Union[Task, List[Task]], 1633 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1634 first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 1635 terminate_tree: bool = False, 1636 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1637 timeout: Optional[float] = None, 1638 terminate_other_tasks: Optional[bool] = None, 1639 terminate_other_tasks_on_success: bool = False, 1640 terminate_other_tasks_on_failure: bool = False, 1641 terminate_other_tasks_on_timeout: bool = False, 1642 wait_for_termination: bool = False, 1643 ) -> List[Tuple[CoroID, Any, Exception]]: 1644 i: Interface = current_interface() 1645 return await await_tasks_atomic_custom_explicit( 1646 i, 1647 tasks, 1648 result_criteria_handler, 1649 first_failed_result_formatter, 1650 terminate_tree, 1651 graceful_termination_settings, 1652 timeout, 1653 terminate_other_tasks, 1654 terminate_other_tasks_on_success, 1655 terminate_other_tasks_on_failure, 1656 terminate_other_tasks_on_timeout, 1657 wait_for_termination, 1658 )
def
wait_tasks_atomic_custom_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], first_failed_result_formatter: typing.Union[typing.Callable[[int, typing.Any, Exception], typing.Any], NoneType] = None, terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1665def wait_tasks_atomic_custom_explicit( 1666 i: Interface, 1667 tasks: Union[Task, List[Task]], 1668 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1669 first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 1670 terminate_tree: bool = False, 1671 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1672 timeout: Optional[float] = None, 1673 terminate_other_tasks: Optional[bool] = None, 1674 terminate_other_tasks_on_success: bool = False, 1675 terminate_other_tasks_on_failure: bool = False, 1676 terminate_other_tasks_on_timeout: bool = False, 1677 wait_for_termination: bool = False, 1678 ) -> List[Tuple[CoroID, Any, Exception]]: 1679 i(RunCoro, await_tasks_atomic_custom_explicit, 1680 tasks, 1681 result_criteria_handler, 1682 first_failed_result_formatter, 1683 terminate_tree, 1684 graceful_termination_settings, 1685 timeout, 1686 terminate_other_tasks, 1687 terminate_other_tasks_on_success, 1688 terminate_other_tasks_on_failure, 1689 terminate_other_tasks_on_timeout, 1690 wait_for_termination, 1691 )
def
wait_tasks_atomic_custom_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], first_failed_result_formatter: typing.Union[typing.Callable[[int, typing.Any, Exception], typing.Any], NoneType] = None, terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1694def wait_tasks_atomic_custom_implicit( 1695 tasks: Union[Task, List[Task]], 1696 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1697 first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 1698 terminate_tree: bool = False, 1699 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1700 timeout: Optional[float] = None, 1701 terminate_other_tasks: Optional[bool] = None, 1702 terminate_other_tasks_on_success: bool = False, 1703 terminate_other_tasks_on_failure: bool = False, 1704 terminate_other_tasks_on_timeout: bool = False, 1705 wait_for_termination: bool = False, 1706 ) -> List[Tuple[CoroID, Any, Exception]]: 1707 i: Interface = current_interface() 1708 return wait_tasks_atomic_custom_explicit( 1709 i, 1710 tasks, 1711 result_criteria_handler, 1712 first_failed_result_formatter, 1713 terminate_tree, 1714 graceful_termination_settings, 1715 timeout, 1716 terminate_other_tasks, 1717 terminate_other_tasks_on_success, 1718 terminate_other_tasks_on_failure, 1719 terminate_other_tasks_on_timeout, 1720 wait_for_termination, 1721 )
def
wait_tasks_atomic_custom( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], first_failed_result_formatter: typing.Union[typing.Callable[[int, typing.Any, Exception], typing.Any], NoneType] = None, terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1665def wait_tasks_atomic_custom_explicit( 1666 i: Interface, 1667 tasks: Union[Task, List[Task]], 1668 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1669 first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 1670 terminate_tree: bool = False, 1671 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1672 timeout: Optional[float] = None, 1673 terminate_other_tasks: Optional[bool] = None, 1674 terminate_other_tasks_on_success: bool = False, 1675 terminate_other_tasks_on_failure: bool = False, 1676 terminate_other_tasks_on_timeout: bool = False, 1677 wait_for_termination: bool = False, 1678 ) -> List[Tuple[CoroID, Any, Exception]]: 1679 i(RunCoro, await_tasks_atomic_custom_explicit, 1680 tasks, 1681 result_criteria_handler, 1682 first_failed_result_formatter, 1683 terminate_tree, 1684 graceful_termination_settings, 1685 timeout, 1686 terminate_other_tasks, 1687 terminate_other_tasks_on_success, 1688 terminate_other_tasks_on_failure, 1689 terminate_other_tasks_on_timeout, 1690 wait_for_termination, 1691 )
def
wait_tasks_atomic_custom_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], first_failed_result_formatter: typing.Union[typing.Callable[[int, typing.Any, Exception], typing.Any], NoneType] = None, terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1694def wait_tasks_atomic_custom_implicit( 1695 tasks: Union[Task, List[Task]], 1696 result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 1697 first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 1698 terminate_tree: bool = False, 1699 graceful_termination_settings: Optional[GracefulTerminationSettings] = None, 1700 timeout: Optional[float] = None, 1701 terminate_other_tasks: Optional[bool] = None, 1702 terminate_other_tasks_on_success: bool = False, 1703 terminate_other_tasks_on_failure: bool = False, 1704 terminate_other_tasks_on_timeout: bool = False, 1705 wait_for_termination: bool = False, 1706 ) -> List[Tuple[CoroID, Any, Exception]]: 1707 i: Interface = current_interface() 1708 return wait_tasks_atomic_custom_explicit( 1709 i, 1710 tasks, 1711 result_criteria_handler, 1712 first_failed_result_formatter, 1713 terminate_tree, 1714 graceful_termination_settings, 1715 timeout, 1716 terminate_other_tasks, 1717 terminate_other_tasks_on_success, 1718 terminate_other_tasks_on_failure, 1719 terminate_other_tasks_on_timeout, 1720 wait_for_termination, 1721 )
def
wait_task_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, task: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]]):
1731def wait_task_explicit(i: Interface, task: Union[Task, List[Task]]): 1732 wait_id: str = str(f'wait_task__{uuid4()}') 1733 def on_done_handler(): 1734 put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high) 1735 1736 task.add_on_done_handler(on_done_handler) 1737 i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id)) 1738 return task.result
def
wait_task_implicit( task: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]]):
1741def wait_task_implicit(task: Union[Task, List[Task]]): 1742 i: Interface = current_interface() 1743 wait_id: str = str(f'wait_task__{uuid4()}') 1744 def on_done_handler(): 1745 put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high) 1746 1747 task.add_on_done_handler(on_done_handler) 1748 i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id)) 1749 return task.result
def
wait_task( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, task: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]]):
1731def wait_task_explicit(i: Interface, task: Union[Task, List[Task]]): 1732 wait_id: str = str(f'wait_task__{uuid4()}') 1733 def on_done_handler(): 1734 put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high) 1735 1736 task.add_on_done_handler(on_done_handler) 1737 i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id)) 1738 return task.result
def
wait_task_im( task: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]]):
1741def wait_task_implicit(task: Union[Task, List[Task]]): 1742 i: Interface = current_interface() 1743 wait_id: str = str(f'wait_task__{uuid4()}') 1744 def on_done_handler(): 1745 put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high) 1746 1747 task.add_on_done_handler(on_done_handler) 1748 i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id)) 1749 return task.result
async def
await_task_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, task: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]]):
1756async def await_task_explicit(i: Interface, task: Union[Task, List[Task]]): 1757 wait_id: str = str(f'wait_task__{uuid4()}') 1758 def on_done_handler(): 1759 put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high) 1760 1761 task.add_on_done_handler(on_done_handler) 1762 await i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id)) 1763 return task.result
async def
await_task_implicit( task: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]]):
1766async def await_task_implicit(task: Union[Task, List[Task]]): 1767 i: Interface = current_interface() 1768 wait_id: str = str(f'wait_task__{uuid4()}') 1769 def on_done_handler(): 1770 put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high) 1771 1772 task.add_on_done_handler(on_done_handler) 1773 await i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id)) 1774 return task.result
async def
await_task( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, task: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]]):
1756async def await_task_explicit(i: Interface, task: Union[Task, List[Task]]): 1757 wait_id: str = str(f'wait_task__{uuid4()}') 1758 def on_done_handler(): 1759 put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high) 1760 1761 task.add_on_done_handler(on_done_handler) 1762 await i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id)) 1763 return task.result
async def
await_task_im( task: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]]):
1766async def await_task_implicit(task: Union[Task, List[Task]]): 1767 i: Interface = current_interface() 1768 wait_id: str = str(f'wait_task__{uuid4()}') 1769 def on_done_handler(): 1770 put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high) 1771 1772 task.add_on_done_handler(on_done_handler) 1773 await i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id)) 1774 return task.result