cengal.parallel_execution.coroutines.coro_tools.wait_tasks.versions.v_0.wait_tasks

   1#!/usr/bin/env python
   2# coding=utf-8
   3
   4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
   5# 
   6# Licensed under the Apache License, Version 2.0 (the "License");
   7# you may not use this file except in compliance with the License.
   8# You may obtain a copy of the License at
   9# 
  10#     http://www.apache.org/licenses/LICENSE-2.0
  11# 
  12# Unless required by applicable law or agreed to in writing, software
  13# distributed under the License is distributed on an "AS IS" BASIS,
  14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15# See the License for the specific language governing permissions and
  16# limitations under the License.
  17
  18
  19# __all__ = []
  20__all__ = [
  21    'WaitType',
  22    'TasksListIsEmptyError',
  23    'IsNotDoneYetError',
  24    'NormalOnDoneHandler',
  25    'NormalInfo',
  26    'FastestOnDoneHandler',
  27    'FastestInfo',
  28    'FastestSuccessfulOnDoneHandler',
  29    'FastestSuccessfulInfo',
  30    'FastestExceptionOnDoneHandler',
  31    'FastestExceptionInfo',
  32    'FastestCustomOnDoneHandler',
  33    'FastestCustomInfo',
  34    'AtomicOnDoneHandler',
  35    'AtomicInfo',
  36    'AtomicCustomOnDoneHandler',
  37    'success_result_criteria_handler',
  38    'exception_result_criteria_handler',
  39    'same_first_failed_result_formatter',
  40    'none_first_failed_result_formatter',
  41    'AtomicCustomInfo',
  42    'GracefulTerminationSettings',
  43    'TerminationReason',
  44    'atask_graceful_destroyer',
  45    'aterminate_tasks_explicit',
  46    'aterminate_tasks_implicit',
  47    'aterminate_tasks',
  48    'aterminate_tasks_im',
  49    'terminate_tasks_explicit',
  50    'terminate_tasks_implicit',
  51    'terminate_tasks',
  52    'terminate_tasks_im',
  53    'await_tasks_explicit',
  54    'await_tasks_implicit',
  55    'await_tasks',
  56    'await_tasks_im',
  57    'wait_tasks_explicit',
  58    'wait_tasks_implicit',
  59    'wait_tasks',
  60    'wait_tasks_im',
  61    'await_tasks_fastest_explicit',
  62    'await_tasks_fastest_implicit',
  63    'await_tasks_fastest',
  64    'await_tasks_fastest_im',
  65    'wait_tasks_fastest_explicit',
  66    'wait_tasks_fastest_implicit',
  67    'wait_tasks_fastest',
  68    'wait_tasks_fastest_im',
  69    'await_tasks_fastest_successful_explicit',
  70    'await_tasks_fastest_successful_implicit',
  71    'await_tasks_fastest_successful',
  72    'await_tasks_fastest_successful_im',
  73    'wait_tasks_fastest_successful_explicit',
  74    'wait_tasks_fastest_successful_implicit',
  75    'wait_tasks_fastest_successful',
  76    'wait_tasks_fastest_successful_im',
  77    'await_tasks_fastest_exception_explicit',
  78    'await_tasks_fastest_exception_implicit',
  79    'await_tasks_fastest_exception',
  80    'await_tasks_fastest_exception_im',
  81    'wait_tasks_fastest_exception_explicit',
  82    'wait_tasks_fastest_exception_implicit',
  83    'wait_tasks_fastest_exception',
  84    'wait_tasks_fastest_exception_im',
  85    'await_tasks_fastest_custom_explicit',
  86    'await_tasks_fastest_custom_implicit',
  87    'await_tasks_fastest_custom',
  88    'await_tasks_fastest_custom_im',
  89    'wait_tasks_fastest_custom_explicit',
  90    'wait_tasks_fastest_custom_implicit',
  91    'wait_tasks_fastest_custom',
  92    'wait_tasks_fastest_custom_im',
  93    'await_tasks_atomic_explicit',
  94    'await_tasks_atomic_implicit',
  95    'await_tasks_atomic',
  96    'await_tasks_atomic_im',
  97    'wait_tasks_atomic_explicit',
  98    'wait_tasks_atomic_implicit',
  99    'wait_tasks_atomic',
 100    'wait_tasks_atomic_im',
 101    'await_tasks_atomic_custom_explicit',
 102    'await_tasks_atomic_custom_implicit',
 103    'await_tasks_atomic_custom',
 104    'await_tasks_atomic_custom_im',
 105    'wait_tasks_atomic_custom_explicit',
 106    'wait_tasks_atomic_custom_implicit',
 107    'wait_tasks_atomic_custom',
 108    'wait_tasks_atomic_custom_im',
 109    'wait_task_explicit',
 110    'wait_task_implicit',
 111    'wait_task',
 112    'wait_task_im',
 113    'await_task_explicit',
 114    'await_task_implicit',
 115    'await_task',
 116    'await_task_im',
 117]
 118
 119
 120"""
 121Module Docstring
 122Docstrings: http://www.python.org/dev/peps/pep-0257/
 123"""
 124
 125__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 126__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 127__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 128__license__ = "Apache License, Version 2.0"
 129__version__ = "4.4.1"
 130__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 131__email__ = "gtalk@butenkoms.space"
 132# __status__ = "Prototype"
 133__status__ = "Development"
 134# __status__ = "Production"
 135
 136
 137from cengal.parallel_execution.coroutines.coro_scheduler import Interface, current_interface, CoroID, get_interface_and_loop_with_explicit_loop
 138from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import Task
 139from cengal.parallel_execution.coroutines.coro_standard_services.put_coro_list import PutCoroList, PSCP
 140from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import RunCoro
 141from cengal.parallel_execution.coroutines.coro_standard_services.kill_coro_list import KillCoroList, KillSingleCoroParams
 142from cengal.parallel_execution.coroutines.coro_standard_services.wait_coro import TimeoutError, SubTimeoutError
 143from cengal.parallel_execution.coroutines.coro_standard_services.timer_func_runner import timer_func_run_on
 144from cengal.parallel_execution.coroutines.coro_tools.coro_flow_control import agraceful_coro_destroyer
 145from cengal.parallel_execution.coroutines.coro_standard_services.async_event_bus import AsyncEventBusRequest, AsyncEventBus
 146from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import CoroPriority
 147from cengal.parallel_execution.coroutines.coro_standard_services_internal_lib.service_with_a_direct_request import put_request_to_service
 148
 149from uuid import uuid4
 150from enum import IntEnum
 151from typing import Optional, List, Union, Any, Type, Tuple, Dict, Callable
 152
 153
 154class WaitType(IntEnum):
 155    normal = 0
 156    fastest = 1
 157    fastest_successful = 2
 158    fastest_exception = 3
 159    fastest_custom = 4
 160    atomic = 5
 161    atomic_custom = 5
 162
 163
 164class TasksListIsEmptyError(Exception):
 165    pass
 166
 167
 168class IsNotDoneYetError(Exception):
 169    pass
 170
 171
 172class NormalOnDoneHandler:
 173    def __init__(self, normal_info: 'NormalInfo') -> None:
 174        self.normal_info: 'NormalInfo' = normal_info
 175    
 176    def __call__(self, task: Task) -> Any:
 177        self.normal_info.add_result(task.coro_id, task._result, task._exception)
 178
 179
 180class NormalInfo:
 181    def __init__(self, wait_done_id: str, tasks: List[Task]) -> None:
 182        self.tasks: List[Task] = tasks
 183        self.coroutines_num: int = len(tasks)
 184        self.results: List[Tuple[CoroID, Any, Exception]] = list()
 185        self.wait_done_id: str = wait_done_id
 186        self.ignore: bool = False
 187        self._done: bool = False
 188        self._timeout: bool = False
 189        self._failure: bool = False
 190    
 191    def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None:
 192        if self.ignore:
 193            return
 194        
 195        self.results.append((coro_id, result, exception))
 196        if len(self.results) >= self.coroutines_num:
 197            self.done = True
 198    
 199    @property
 200    def failure(self) -> bool:
 201        return self._failure
 202    
 203    @failure.setter
 204    def failure(self, value: bool) -> None:
 205        self._failure = value
 206    
 207    @property
 208    def done(self) -> bool:
 209        return self._done
 210    
 211    @done.setter
 212    def done(self, value: bool) -> None:
 213        if self._done:
 214            return
 215        
 216        self._done = value
 217        if value:
 218            self.ignore = True
 219            put_request_to_service(AsyncEventBus, self.wait_done_id, None, CoroPriority.high)
 220    
 221    @property
 222    def timeout(self) -> bool:
 223        return self._timeout
 224    
 225    def on_timeout(self) -> None:
 226        self._timeout = True
 227        self.done = True
 228    
 229    def gather(self) -> List[Tuple[CoroID, Any, Exception]]:
 230        if not self.done:
 231            raise IsNotDoneYetError
 232        
 233        return self.results
 234
 235
 236# =================================================================================
 237
 238
 239class FastestOnDoneHandler:
 240    def __init__(self, fastest_info: 'FastestInfo') -> None:
 241        self.fastest_info: 'FastestInfo' = fastest_info
 242    
 243    def __call__(self, task: Task) -> Any:
 244        self.fastest_info.add_result((task.coro_id, task._result, task._exception))
 245
 246
 247class FastestInfo(NormalInfo):
 248    def __init__(self, wait_done_id: str, tasks: List[Task]) -> None:
 249        super().__init__(wait_done_id, tasks)
 250        self.coroutines_num: int = 1
 251
 252
 253# =================================================================================
 254
 255
 256class FastestSuccessfulOnDoneHandler:
 257    def __init__(self, fastest_successful_info: 'FastestSuccessfulInfo') -> None:
 258        self.fastest_successful_info: 'FastestSuccessfulInfo' = fastest_successful_info
 259    
 260    def __call__(self, task: Task) -> Any:
 261        self.fastest_successful_info.add_result((task.coro_id, task._result, task._exception))
 262
 263
 264class FastestSuccessfulInfo(NormalInfo):
 265    def __init__(self, wait_done_id: str, tasks: List[Task]) -> None:
 266        super().__init__(wait_done_id, tasks)
 267        self.fastest_successful_result: Optional[Tuple[CoroID, Any, Exception]] = None
 268    
 269    def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None:
 270        if self.ignore:
 271            return
 272        
 273        self.results.append((coro_id, result, exception))
 274        if exception is None:
 275            self.fastest_successful_result = (coro_id, result, exception)
 276            self.done = True
 277        elif (len(self.results) >= self.coroutines_num):
 278            self.done = True
 279    
 280    def gather(self) -> List[Tuple[CoroID, Any, Exception]]:
 281        if not self.done:
 282            raise IsNotDoneYetError
 283        
 284        if self.fastest_successful_result is None:
 285            self.failure = True
 286        
 287        return [self.fastest_successful_result]
 288
 289
 290# =================================================================================
 291
 292
 293class FastestExceptionOnDoneHandler:
 294    def __init__(self, fastest_exception_info: 'FastestExceptionInfo') -> None:
 295        self.fastest_exception_info: 'FastestExceptionInfo' = fastest_exception_info
 296    
 297    def __call__(self, task: Task) -> Any:
 298        self.fastest_exception_info.add_result((task.coro_id, task._result, task._exception))
 299
 300
 301class FastestExceptionInfo(NormalInfo):
 302    def __init__(self, wait_done_id: str, tasks: List[Task]) -> None:
 303        super().__init__(wait_done_id, tasks)
 304        self.fastest_exception_result: Optional[Tuple[CoroID, Any, Exception]] = None
 305    
 306    def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None:
 307        if self.ignore:
 308            return
 309        
 310        self.results.append((coro_id, result, exception))
 311        if exception is not None:
 312            self.fastest_exception_result = (coro_id, result, exception)
 313            self.done = True
 314        elif (len(self.results) >= self.coroutines_num):
 315            self.done = True
 316    
 317    def gather(self) -> List[Tuple[CoroID, Any, Exception]]:
 318        if not self.done:
 319            raise IsNotDoneYetError
 320        
 321        if self.fastest_exception_result is None:
 322            self.failure = True
 323        
 324        return [self.fastest_exception_result]
 325
 326
 327# =================================================================================
 328
 329
 330class FastestCustomOnDoneHandler:
 331    def __init__(self, fastest_custom_info: 'FastestCustomInfo') -> None:
 332        self.fastest_custom_info: 'FastestCustomInfo' = fastest_custom_info
 333    
 334    def __call__(self, task: Task) -> Any:
 335        self.fastest_custom_info.add_result((task.coro_id, task._result, task._exception))
 336
 337
 338class FastestCustomInfo(NormalInfo):
 339    def __init__(self, wait_done_id: str, tasks: List[Task], result_criteria_handler: Callable[[CoroID, Any, Exception], bool]) -> None:
 340        super().__init__(wait_done_id, tasks)
 341        self.result_criteria_handler: Callable[[CoroID, Any, Exception], bool] = result_criteria_handler
 342        self.fastest_custom_result: Optional[Tuple[CoroID, Any, Exception]] = None
 343    
 344    def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None:
 345        if self.ignore:
 346            return
 347        
 348        self.results.append((coro_id, result, exception))
 349        if self.result_criteria_handler(coro_id, result, exception):
 350            self.fastest_custom_result = (coro_id, result, exception)
 351            self.done = True
 352        elif (len(self.results) >= self.coroutines_num):
 353            self.done = True
 354    
 355    def gather(self) -> List[Tuple[CoroID, Any, Exception]]:
 356        if not self.done:
 357            raise IsNotDoneYetError
 358        
 359        if self.fastest_custom_result is None:
 360            self.failure = True
 361        
 362        return [self.fastest_custom_result]
 363
 364
 365# =================================================================================
 366
 367
 368class AtomicOnDoneHandler:
 369    def __init__(self, atomic_info: 'AtomicInfo') -> None:
 370        self.atomic_info: 'AtomicInfo' = atomic_info
 371    
 372    def __call__(self, task: Task) -> Any:
 373        self.atomic_info.add_result(task.coro_id, task._result, task._exception)
 374
 375
 376class AtomicInfo(NormalInfo):
 377    def __init__(self, wait_done_id: str, tasks: List[Task], include_first_failure: bool = True) -> None:
 378        super().__init__(wait_done_id, tasks)
 379        self.include_first_failure: bool = include_first_failure
 380    
 381    def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None:
 382        if self.ignore:
 383            return
 384        
 385        if exception is not None:
 386            if self.include_first_failure:
 387                self.results.append((coro_id, result, exception))
 388            
 389            self.failure = True
 390            self.done = True
 391        else:
 392            self.results.append((coro_id, result, exception))
 393        
 394        if len(self.results) >= self.coroutines_num:
 395            self.done = True
 396    
 397    def gather(self) -> List[Tuple[CoroID, Any, Exception]]:
 398        if not self.done:
 399            raise IsNotDoneYetError
 400        
 401        normalized_results: List[Tuple[CoroID, Any, Exception]] = list()
 402        provided_results: Dict[CoroID, Tuple[Any, Exception]] = dict()
 403        for result_info in self.results:
 404            if result_info is None:
 405                continue
 406
 407            coro_id, result, exception = result_info
 408            provided_results[coro_id] = (result, exception)
 409        
 410        for task in self.tasks:
 411            coro_id = task.coro_id
 412            if coro_id in provided_results:
 413                result, exception = provided_results[coro_id]
 414                normalized_results.append((coro_id, result, exception))
 415            else:
 416                normalized_results.append(None)
 417        
 418        return normalized_results
 419
 420
 421# =================================================================================
 422
 423
 424class AtomicCustomOnDoneHandler:
 425    def __init__(self, atomic_custom_info: 'AtomicCustomInfo') -> None:
 426        self.atomic_custom_info: 'AtomicCustomInfo' = atomic_custom_info
 427    
 428    def __call__(self, task: Task) -> Any:
 429        self.atomic_custom_info.add_result(task.coro_id, task._result, task._exception)
 430
 431
 432def success_result_criteria_handler(coro_id: CoroID, result: Any, exception: Exception) -> bool:
 433    return exception is None
 434
 435
 436def exception_result_criteria_handler(coro_id: CoroID, result: Any, exception: Exception) -> bool:
 437    return exception is not None
 438
 439
 440def same_first_failed_result_formatter(coro_id: CoroID, result: Any, exception: Exception) -> Any:
 441    return (coro_id, result, exception)
 442
 443
 444def none_first_failed_result_formatter(coro_id: CoroID, result: Any, exception: Exception) -> Any:
 445    return None
 446
 447
 448class AtomicCustomInfo(NormalInfo):
 449    def __init__(
 450            self, 
 451            wait_done_id: str, 
 452            tasks: List[Task], 
 453            result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
 454            first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None,
 455            ) -> None:
 456        super().__init__(wait_done_id, tasks)
 457        self.result_criteria_handler: Callable[[CoroID, Any, Exception], bool] = result_criteria_handler
 458        self.first_failed_result_formatter: Callable[[CoroID, Any, Exception], Any] = \
 459            none_first_failed_result_formatter if first_failed_result_formatter is None else first_failed_result_formatter
 460    
 461    def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None:
 462        if self.ignore:
 463            return
 464        
 465        self.results.append((coro_id, result, exception))
 466        if (len(self.results) >= self.coroutines_num) or (exception is not None):
 467            self.done = True
 468        
 469        if not self.result_criteria_handler(coro_id, result, exception):
 470            result = self.first_failed_result_formatter(coro_id, result, exception)
 471            self.failure = True
 472            self.done = True
 473        else:
 474            result = (coro_id, result, exception)
 475        
 476        self.results.append(result)
 477        if len(self.results) >= self.coroutines_num:
 478            self.done = True
 479    
 480    def gather(self) -> List[Tuple[CoroID, Any, Exception]]:
 481        if not self.done:
 482            raise IsNotDoneYetError
 483        
 484        normalized_results: List[Tuple[CoroID, Any, Exception]] = list()
 485        provided_results: Dict[CoroID, Tuple[Any, Exception]] = dict()
 486        for result_info in self.results:
 487            if result_info is None:
 488                continue
 489
 490            coro_id, result, exception = result_info
 491            provided_results[coro_id] = (result, exception)
 492        
 493        for task in self.tasks:
 494            coro_id = task.coro_id
 495            if coro_id in provided_results:
 496                result, exception = provided_results[coro_id]
 497                normalized_results.append((coro_id, result, exception))
 498            else:
 499                normalized_results.append(None)
 500        
 501        return normalized_results
 502
 503
 504# =================================================================================
 505
 506
 507class GracefulTerminationSettings:
 508    def __init__(self, 
 509                 phase_time_limit: Optional[float], 
 510                 ex_type: Type[Exception] = None, ex_value: Exception = None, ex_traceback: Any = None, 
 511                 first_phase_is_wait: bool = True,
 512                 last_phase_is_kill: bool = True, 
 513                 ) -> None:
 514        self.phase_time_limit: Optional[float] = phase_time_limit
 515        self.ex_type: Type[Exception] = ex_type
 516        self.ex_value: Exception = ex_value
 517        self.ex_traceback: Any = ex_traceback
 518        self.first_phase_is_wait: bool = first_phase_is_wait
 519        self.last_phase_is_kill: bool = last_phase_is_kill
 520
 521
 522class TerminationReason(IntEnum):
 523    success = 0
 524    failure = 1
 525    timeout = 2
 526
 527                
 528async def atask_graceful_destroyer(
 529        i: Interface, 
 530        task: Task, 
 531        tree: bool = False,
 532        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
 533        ) -> None:
 534    await agraceful_coro_destroyer(
 535        i, 
 536        graceful_termination_settings.phase_time_limit, 
 537        task.coro_id, 
 538        graceful_termination_settings.ex_type, 
 539        graceful_termination_settings.ex_value, 
 540        graceful_termination_settings.ex_traceback, 
 541        tree, 
 542        graceful_termination_settings.first_phase_is_wait, 
 543        graceful_termination_settings.last_phase_is_kill, 
 544        )
 545
 546
 547async def aterminate_tasks_explicit(
 548        i: Interface, 
 549        tasks: List[Task], 
 550        termination_reason: TerminationReason,
 551        tree: bool = False,
 552        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
 553        terminate_other_tasks: Optional[bool] = None, 
 554        terminate_other_tasks_on_success: bool = False, 
 555        terminate_other_tasks_on_failure: bool = False, 
 556        terminate_other_tasks_on_timeout: bool = False, 
 557        wait_for_termination: bool = False, 
 558        ) -> bool:
 559    if not tasks:
 560        return False
 561    
 562    if terminate_other_tasks is not None:
 563        terminate_other_tasks_on_success = terminate_other_tasks
 564        terminate_other_tasks_on_failure = terminate_other_tasks
 565        terminate_other_tasks_on_timeout = terminate_other_tasks
 566
 567    if ((TerminationReason.success == termination_reason) and terminate_other_tasks_on_success) or \
 568        ((TerminationReason.failure == termination_reason) and terminate_other_tasks_on_failure) or \
 569            ((TerminationReason.timeout == termination_reason) and terminate_other_tasks_on_timeout):
 570        if graceful_termination_settings is None:
 571            termination_tasks_params: List[KillSingleCoroParams] = list([KillSingleCoroParams(task.coro_id, tree) for task in tasks if not task.done])
 572            if not termination_tasks_params:
 573                return False
 574            
 575            i(KillCoroList, termination_tasks)
 576        else:
 577            termination_tasks_params: List[PSCP] = [PSCP(atask_graceful_destroyer, task, tree, graceful_termination_settings) for task in tasks if not task.done]
 578            if not termination_tasks_params:
 579                return False
 580            
 581            termination_tasks: List[Task] = await i(PutCoroList, termination_tasks_params, True)
 582            if wait_for_termination:
 583                await await_tasks_explicit(i, termination_tasks, False, None, None, False, False, False, False)
 584    
 585    return True
 586
 587
 588async def aterminate_tasks_implicit(
 589        tasks: List[Task], 
 590        termination_reason: TerminationReason,
 591        tree: bool = False,
 592        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
 593        terminate_other_tasks: Optional[bool] = None, 
 594        terminate_other_tasks_on_success: bool = False, 
 595        terminate_other_tasks_on_failure: bool = False, 
 596        terminate_other_tasks_on_timeout: bool = False, 
 597        wait_for_termination: bool = False, 
 598        ) -> bool:
 599    i: Interface = current_interface()
 600    return await aterminate_tasks_explicit(
 601        i, 
 602        tasks, 
 603        termination_reason, 
 604        tree, 
 605        graceful_termination_settings, 
 606        terminate_other_tasks, 
 607        terminate_other_tasks_on_success, 
 608        terminate_other_tasks_on_failure, 
 609        terminate_other_tasks_on_timeout, 
 610        wait_for_termination, 
 611        )
 612
 613
 614aterminate_tasks: Callable = aterminate_tasks_explicit
 615aterminate_tasks_im: Callable = aterminate_tasks_implicit
 616
 617
 618def terminate_tasks_explicit(
 619        i: Interface, 
 620        tasks: List[Task], 
 621        termination_reason: TerminationReason,
 622        tree: bool = False,
 623        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
 624        terminate_other_tasks: Optional[bool] = None, 
 625        terminate_other_tasks_on_success: bool = False, 
 626        terminate_other_tasks_on_failure: bool = False, 
 627        terminate_other_tasks_on_timeout: bool = False, 
 628        wait_for_termination: bool = False, 
 629        ) -> bool:
 630    i(RunCoro, aterminate_tasks_explicit,
 631        tasks, 
 632        termination_reason, 
 633        tree, 
 634        graceful_termination_settings, 
 635        terminate_other_tasks, 
 636        terminate_other_tasks_on_success, 
 637        terminate_other_tasks_on_failure, 
 638        terminate_other_tasks_on_timeout, 
 639        wait_for_termination, 
 640        )
 641
 642
 643def terminate_tasks_implicit(
 644        tasks: List[Task], 
 645        termination_reason: TerminationReason,
 646        tree: bool = False,
 647        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
 648        terminate_other_tasks: Optional[bool] = None, 
 649        terminate_other_tasks_on_success: bool = False, 
 650        terminate_other_tasks_on_failure: bool = False, 
 651        terminate_other_tasks_on_timeout: bool = False, 
 652        wait_for_termination: bool = False, 
 653        ) -> bool:
 654    i: Interface = current_interface()
 655    return terminate_tasks_explicit(
 656        i, 
 657        tasks, 
 658        termination_reason, 
 659        tree, 
 660        graceful_termination_settings, 
 661        terminate_other_tasks, 
 662        terminate_other_tasks_on_success, 
 663        terminate_other_tasks_on_failure, 
 664        terminate_other_tasks_on_timeout, 
 665        wait_for_termination, 
 666        )
 667
 668
 669terminate_tasks: Callable = terminate_tasks_explicit
 670terminate_tasks_im: Callable = terminate_tasks_implicit
 671
 672
 673# =================================================================================
 674
 675
 676async def await_tasks_explicit(
 677        i: Interface, 
 678        tasks: Union[Task, List[Task]], 
 679        terminate_tree: bool = False,
 680        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
 681        timeout: Optional[float] = None, 
 682        terminate_other_tasks: Optional[bool] = None, 
 683        terminate_other_tasks_on_success: bool = False, 
 684        terminate_other_tasks_on_failure: bool = False, 
 685        terminate_other_tasks_on_timeout: bool = False, 
 686        wait_for_termination: bool = False, 
 687        ) -> List[Tuple[CoroID, Any, Exception]]:
 688    if isinstance(tasks, Task):
 689        tasks = [tasks]
 690    
 691    if not tasks:
 692        raise TasksListIsEmptyError
 693
 694    normal_wait_id: str = str(f'wait_task__normal__{uuid4()}')
 695    normal_info: NormalInfo = NormalInfo(normal_wait_id, tasks)
 696
 697    if timeout is not None:
 698        # TODO: implement discard timer request when needed in order to improve performance
 699        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, normal_info.on_timeout)
 700
 701    for another_task in tasks:
 702        another_task.add_on_done_handler(NormalOnDoneHandler(normal_info))
 703    
 704    await i(AsyncEventBus, AsyncEventBusRequest().wait(normal_wait_id))
 705    results: List[Tuple[CoroID, Any, Exception]] = normal_info.gather()
 706    if normal_info.timeout:
 707        termination_reason: TerminationReason = TerminationReason.timeout
 708    elif normal_info.failure:
 709        termination_reason = TerminationReason.failure
 710    else:
 711        termination_reason = TerminationReason.success
 712
 713    await terminate_tasks_explicit(
 714        i, 
 715        tasks, 
 716        termination_reason, 
 717        terminate_tree, 
 718        graceful_termination_settings, 
 719        terminate_other_tasks, 
 720        terminate_other_tasks_on_success, 
 721        terminate_other_tasks_on_failure, 
 722        terminate_other_tasks_on_timeout, 
 723        wait_for_termination, 
 724        )
 725    return results
 726
 727
 728async def await_tasks_implicit(
 729        tasks: Union[Task, List[Task]], 
 730        terminate_tree: bool = False,
 731        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
 732        timeout: Optional[float] = None, 
 733        terminate_other_tasks: Optional[bool] = None, 
 734        terminate_other_tasks_on_success: bool = False, 
 735        terminate_other_tasks_on_failure: bool = False, 
 736        terminate_other_tasks_on_timeout: bool = False, 
 737        wait_for_termination: bool = False, 
 738        ) -> List[Tuple[CoroID, Any, Exception]]:
 739    i: Interface = current_interface()
 740    return await await_tasks_explicit(
 741        i, 
 742        tasks, 
 743        terminate_tree, 
 744        graceful_termination_settings, 
 745        timeout, 
 746        terminate_other_tasks, 
 747        terminate_other_tasks_on_success, 
 748        terminate_other_tasks_on_failure, 
 749        terminate_other_tasks_on_timeout, 
 750        wait_for_termination, 
 751        )
 752
 753
 754await_tasks: Callable = await_tasks_explicit
 755await_tasks_im: Callable = await_tasks_implicit
 756
 757
 758def wait_tasks_explicit(
 759        i: Interface, 
 760        tasks: Union[Task, List[Task]], 
 761        terminate_tree: bool = False,
 762        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
 763        timeout: Optional[float] = None, 
 764        terminate_other_tasks: Optional[bool] = None, 
 765        terminate_other_tasks_on_success: bool = False, 
 766        terminate_other_tasks_on_failure: bool = False, 
 767        terminate_other_tasks_on_timeout: bool = False, 
 768        wait_for_termination: bool = False, 
 769        ) -> List[Tuple[CoroID, Any, Exception]]:
 770    i(RunCoro, await_tasks_explicit,
 771        tasks, 
 772        terminate_tree, 
 773        graceful_termination_settings, 
 774        timeout, 
 775        terminate_other_tasks, 
 776        terminate_other_tasks_on_success, 
 777        terminate_other_tasks_on_failure, 
 778        terminate_other_tasks_on_timeout, 
 779        wait_for_termination, 
 780        )
 781
 782
 783def wait_tasks_implicit(
 784        tasks: Union[Task, List[Task]], 
 785        terminate_tree: bool = False,
 786        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
 787        timeout: Optional[float] = None, 
 788        terminate_other_tasks: Optional[bool] = None, 
 789        terminate_other_tasks_on_success: bool = False, 
 790        terminate_other_tasks_on_failure: bool = False, 
 791        terminate_other_tasks_on_timeout: bool = False, 
 792        wait_for_termination: bool = False, 
 793        ) -> List[Tuple[CoroID, Any, Exception]]:
 794    i: Interface = current_interface()
 795    return wait_tasks_explicit(
 796        i, 
 797        tasks, 
 798        terminate_tree, 
 799        graceful_termination_settings, 
 800        timeout, 
 801        terminate_other_tasks, 
 802        terminate_other_tasks_on_success, 
 803        terminate_other_tasks_on_failure, 
 804        terminate_other_tasks_on_timeout, 
 805        wait_for_termination, 
 806        )
 807
 808
 809wait_tasks: Callable = wait_tasks_explicit
 810wait_tasks_im: Callable = wait_tasks_implicit
 811
 812
 813# =================================================================================
 814
 815
 816async def await_tasks_fastest_explicit(
 817        i: Interface, 
 818        tasks: Union[Task, List[Task]], 
 819        terminate_tree: bool = False,
 820        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
 821        timeout: Optional[float] = None, 
 822        terminate_other_tasks: Optional[bool] = None, 
 823        terminate_other_tasks_on_success: bool = False, 
 824        terminate_other_tasks_on_failure: bool = False, 
 825        terminate_other_tasks_on_timeout: bool = False, 
 826        wait_for_termination: bool = False, 
 827        ) -> Tuple[CoroID, Any]:
 828    if isinstance(tasks, Task):
 829        tasks = [tasks]
 830    
 831    if not tasks:
 832        raise TasksListIsEmptyError
 833
 834    fastest_wait_id: str = str(f'wait_task__fastest__{uuid4()}')
 835    fastest_info: FastestInfo = FastestInfo(fastest_wait_id, tasks)
 836
 837    if timeout is not None:
 838        # TODO: implement discard timer request when needed in order to improve performance
 839        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_info.on_timeout)
 840
 841    for another_task in tasks:
 842        another_task.add_on_done_handler(FastestOnDoneHandler(fastest_info))
 843    
 844    await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_wait_id))
 845    results: List[Tuple[CoroID, Any, Exception]] = fastest_info.gather()
 846    if fastest_info.timeout:
 847        termination_reason: TerminationReason = TerminationReason.timeout
 848    elif fastest_info.failure:
 849        termination_reason = TerminationReason.failure
 850    else:
 851        termination_reason = TerminationReason.success
 852
 853    await aterminate_tasks_explicit(
 854        i, 
 855        tasks, 
 856        termination_reason, 
 857        terminate_tree, 
 858        graceful_termination_settings, 
 859        terminate_other_tasks, 
 860        terminate_other_tasks_on_success, 
 861        terminate_other_tasks_on_failure, 
 862        terminate_other_tasks_on_timeout, 
 863        wait_for_termination, 
 864        )
 865    if not results:
 866        return None, None
 867    
 868    coro_id, result, exception = results[0]
 869    if exception is not None:
 870        if isinstance(exception, TimeoutError) and (coro_id is not None):
 871            exception = SubTimeoutError()
 872        
 873        raise exception
 874    
 875    return coro_id, result
 876
 877
 878async def await_tasks_fastest_implicit(
 879        tasks: Union[Task, List[Task]], 
 880        terminate_tree: bool = False,
 881        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
 882        timeout: Optional[float] = None, 
 883        terminate_other_tasks: Optional[bool] = None, 
 884        terminate_other_tasks_on_success: bool = False, 
 885        terminate_other_tasks_on_failure: bool = False, 
 886        terminate_other_tasks_on_timeout: bool = False, 
 887        wait_for_termination: bool = False, 
 888        ) -> List[Tuple[CoroID, Any, Exception]]:
 889    i: Interface = current_interface()
 890    return await await_tasks_fastest_explicit(
 891        i, 
 892        tasks, 
 893        terminate_tree, 
 894        graceful_termination_settings, 
 895        timeout, 
 896        terminate_other_tasks, 
 897        terminate_other_tasks_on_success, 
 898        terminate_other_tasks_on_failure, 
 899        terminate_other_tasks_on_timeout, 
 900        wait_for_termination, 
 901        )
 902
 903
 904await_tasks_fastest: Callable = await_tasks_fastest_explicit
 905await_tasks_fastest_im: Callable = await_tasks_fastest_implicit
 906
 907
 908def wait_tasks_fastest_explicit(
 909        i: Interface, 
 910        tasks: Union[Task, List[Task]], 
 911        terminate_tree: bool = False,
 912        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
 913        timeout: Optional[float] = None, 
 914        terminate_other_tasks: Optional[bool] = None, 
 915        terminate_other_tasks_on_success: bool = False, 
 916        terminate_other_tasks_on_failure: bool = False, 
 917        terminate_other_tasks_on_timeout: bool = False, 
 918        wait_for_termination: bool = False, 
 919        ) -> List[Tuple[CoroID, Any, Exception]]:
 920    i(RunCoro, await_tasks_fastest_explicit,
 921        tasks, 
 922        terminate_tree, 
 923        graceful_termination_settings, 
 924        timeout, 
 925        terminate_other_tasks, 
 926        terminate_other_tasks_on_success, 
 927        terminate_other_tasks_on_failure, 
 928        terminate_other_tasks_on_timeout, 
 929        wait_for_termination, 
 930        )
 931
 932
 933def wait_tasks_fastest_implicit(
 934        tasks: Union[Task, List[Task]], 
 935        terminate_tree: bool = False,
 936        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
 937        timeout: Optional[float] = None, 
 938        terminate_other_tasks: Optional[bool] = None, 
 939        terminate_other_tasks_on_success: bool = False, 
 940        terminate_other_tasks_on_failure: bool = False, 
 941        terminate_other_tasks_on_timeout: bool = False, 
 942        wait_for_termination: bool = False, 
 943        ) -> List[Tuple[CoroID, Any, Exception]]:
 944    i: Interface = current_interface()
 945    return wait_tasks_fastest_explicit(
 946        i, 
 947        tasks, 
 948        terminate_tree, 
 949        graceful_termination_settings, 
 950        timeout, 
 951        terminate_other_tasks, 
 952        terminate_other_tasks_on_success, 
 953        terminate_other_tasks_on_failure, 
 954        terminate_other_tasks_on_timeout, 
 955        wait_for_termination, 
 956        )
 957
 958
 959wait_tasks_fastest: Callable = wait_tasks_fastest_explicit
 960wait_tasks_fastest_im: Callable = wait_tasks_fastest_implicit
 961
 962
 963# =================================================================================
 964
 965
 966async def await_tasks_fastest_successful_explicit(
 967        i: Interface, 
 968        tasks: Union[Task, List[Task]], 
 969        terminate_tree: bool = False,
 970        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
 971        timeout: Optional[float] = None, 
 972        terminate_other_tasks: Optional[bool] = None, 
 973        terminate_other_tasks_on_success: bool = False, 
 974        terminate_other_tasks_on_failure: bool = False, 
 975        terminate_other_tasks_on_timeout: bool = False, 
 976        wait_for_termination: bool = False, 
 977        ) -> Tuple[CoroID, Any]:
 978    if isinstance(tasks, Task):
 979        tasks = [tasks]
 980    
 981    if not tasks:
 982        raise TasksListIsEmptyError
 983
 984    fastest_successful_wait_id: str = str(f'wait_task__fastest_successful__{uuid4()}')
 985    fastest_successful_info: FastestSuccessfulInfo = FastestSuccessfulInfo(fastest_successful_wait_id, tasks)
 986
 987    if timeout is not None:
 988        # TODO: implement discard timer request when needed in order to improve performance
 989        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_successful_info.on_timeout)
 990
 991    for another_task in tasks:
 992        another_task.add_on_done_handler(FastestSuccessfulOnDoneHandler(fastest_successful_info))
 993    
 994    await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_successful_wait_id))
 995    results: List[Tuple[CoroID, Any, Exception]] = fastest_successful_info.gather()
 996    if fastest_successful_info.timeout:
 997        termination_reason: TerminationReason = TerminationReason.timeout
 998    elif fastest_successful_info.failure:
 999        termination_reason = TerminationReason.failure
1000    else:
1001        termination_reason = TerminationReason.success
1002
1003    await aterminate_tasks_explicit(
1004        i, 
1005        tasks, 
1006        termination_reason, 
1007        terminate_tree, 
1008        graceful_termination_settings, 
1009        terminate_other_tasks, 
1010        terminate_other_tasks_on_success, 
1011        terminate_other_tasks_on_failure, 
1012        terminate_other_tasks_on_timeout, 
1013        wait_for_termination, 
1014        )
1015    if not results:
1016        return None, None
1017    
1018    coro_id, result, exception = results[0]
1019    if exception is not None:
1020        if isinstance(exception, TimeoutError) and (coro_id is not None):
1021            exception = SubTimeoutError()
1022        
1023        raise exception
1024    
1025    return coro_id, result
1026
1027
1028async def await_tasks_fastest_successful_implicit(
1029        tasks: Union[Task, List[Task]], 
1030        terminate_tree: bool = False,
1031        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1032        timeout: Optional[float] = None, 
1033        terminate_other_tasks: Optional[bool] = None, 
1034        terminate_other_tasks_on_success: bool = False, 
1035        terminate_other_tasks_on_failure: bool = False, 
1036        terminate_other_tasks_on_timeout: bool = False, 
1037        wait_for_termination: bool = False, 
1038        ) -> List[Tuple[CoroID, Any, Exception]]:
1039    i: Interface = current_interface()
1040    return await await_tasks_fastest_successful_explicit(
1041        i, 
1042        tasks, 
1043        terminate_tree, 
1044        graceful_termination_settings, 
1045        timeout, 
1046        terminate_other_tasks, 
1047        terminate_other_tasks_on_success, 
1048        terminate_other_tasks_on_failure, 
1049        terminate_other_tasks_on_timeout, 
1050        wait_for_termination, 
1051        )
1052
1053
1054await_tasks_fastest_successful: Callable = await_tasks_fastest_successful_explicit
1055await_tasks_fastest_successful_im: Callable = await_tasks_fastest_successful_implicit
1056
1057
1058def wait_tasks_fastest_successful_explicit(
1059        i: Interface, 
1060        tasks: Union[Task, List[Task]], 
1061        terminate_tree: bool = False,
1062        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1063        timeout: Optional[float] = None, 
1064        terminate_other_tasks: Optional[bool] = None, 
1065        terminate_other_tasks_on_success: bool = False, 
1066        terminate_other_tasks_on_failure: bool = False, 
1067        terminate_other_tasks_on_timeout: bool = False, 
1068        wait_for_termination: bool = False, 
1069        ) -> List[Tuple[CoroID, Any, Exception]]:
1070    i(RunCoro, await_tasks_explicit,
1071        tasks, 
1072        terminate_tree, 
1073        graceful_termination_settings, 
1074        timeout, 
1075        terminate_other_tasks, 
1076        terminate_other_tasks_on_success, 
1077        terminate_other_tasks_on_failure, 
1078        terminate_other_tasks_on_timeout, 
1079        wait_for_termination, 
1080        )
1081
1082
1083def wait_tasks_fastest_successful_implicit(
1084        tasks: Union[Task, List[Task]], 
1085        terminate_tree: bool = False,
1086        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1087        timeout: Optional[float] = None, 
1088        terminate_other_tasks: Optional[bool] = None, 
1089        terminate_other_tasks_on_success: bool = False, 
1090        terminate_other_tasks_on_failure: bool = False, 
1091        terminate_other_tasks_on_timeout: bool = False, 
1092        wait_for_termination: bool = False, 
1093        ) -> List[Tuple[CoroID, Any, Exception]]:
1094    i: Interface = current_interface()
1095    return wait_tasks_fastest_successful_explicit(
1096        i, 
1097        tasks, 
1098        terminate_tree, 
1099        graceful_termination_settings, 
1100        timeout, 
1101        terminate_other_tasks, 
1102        terminate_other_tasks_on_success, 
1103        terminate_other_tasks_on_failure, 
1104        terminate_other_tasks_on_timeout, 
1105        wait_for_termination, 
1106        )
1107
1108
1109wait_tasks_fastest_successful: Callable = wait_tasks_fastest_successful_explicit
1110wait_tasks_fastest_successful_im: Callable = wait_tasks_fastest_successful_implicit
1111
1112
1113# =================================================================================
1114
1115
1116async def await_tasks_fastest_exception_explicit(
1117        i: Interface, 
1118        tasks: Union[Task, List[Task]], 
1119        terminate_tree: bool = False,
1120        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1121        timeout: Optional[float] = None, 
1122        terminate_other_tasks: Optional[bool] = None, 
1123        terminate_other_tasks_on_success: bool = False, 
1124        terminate_other_tasks_on_failure: bool = False, 
1125        terminate_other_tasks_on_timeout: bool = False, 
1126        wait_for_termination: bool = False, 
1127        ) -> Tuple[CoroID, Exception]:
1128    if isinstance(tasks, Task):
1129        tasks = [tasks]
1130    
1131    if not tasks:
1132        raise TasksListIsEmptyError
1133
1134    fastest_exception_wait_id: str = str(f'wait_task__fastest_exception__{uuid4()}')
1135    fastest_exception_info: FastestExceptionInfo = FastestExceptionInfo(fastest_exception_wait_id, tasks)
1136
1137    if timeout is not None:
1138        # TODO: implement discard timer request when needed in order to improve performance
1139        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_exception_info.on_timeout)
1140
1141    for another_task in tasks:
1142        another_task.add_on_done_handler(FastestExceptionOnDoneHandler(fastest_exception_info))
1143    
1144    await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_exception_wait_id))
1145    results: List[Tuple[CoroID, Any, Exception]] = fastest_exception_info.gather()
1146    if fastest_exception_info.timeout:
1147        termination_reason: TerminationReason = TerminationReason.timeout
1148    elif fastest_exception_info.failure:
1149        termination_reason = TerminationReason.failure
1150    else:
1151        termination_reason = TerminationReason.success
1152
1153    await aterminate_tasks_explicit(
1154        i, 
1155        tasks, 
1156        termination_reason, 
1157        terminate_tree, 
1158        graceful_termination_settings, 
1159        terminate_other_tasks, 
1160        terminate_other_tasks_on_success, 
1161        terminate_other_tasks_on_failure, 
1162        terminate_other_tasks_on_timeout, 
1163        wait_for_termination, 
1164        )
1165    if not results:
1166        return None, None
1167    
1168    coro_id, result, exception = results[0]
1169    if exception is not None:
1170        if isinstance(exception, TimeoutError) and (coro_id is not None):
1171            exception = SubTimeoutError()
1172    
1173    return coro_id, exception
1174
1175
1176async def await_tasks_fastest_exception_implicit(
1177        tasks: Union[Task, List[Task]], 
1178        terminate_tree: bool = False,
1179        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1180        timeout: Optional[float] = None, 
1181        terminate_other_tasks: Optional[bool] = None, 
1182        terminate_other_tasks_on_success: bool = False, 
1183        terminate_other_tasks_on_failure: bool = False, 
1184        terminate_other_tasks_on_timeout: bool = False, 
1185        wait_for_termination: bool = False, 
1186        ) -> List[Tuple[CoroID, Any, Exception]]:
1187    i: Interface = current_interface()
1188    return await await_tasks_explicit(
1189        i, 
1190        tasks, 
1191        terminate_tree, 
1192        graceful_termination_settings, 
1193        timeout, 
1194        terminate_other_tasks, 
1195        terminate_other_tasks_on_success, 
1196        terminate_other_tasks_on_failure, 
1197        terminate_other_tasks_on_timeout, 
1198        wait_for_termination, 
1199        )
1200
1201
1202await_tasks_fastest_exception: Callable = await_tasks_fastest_exception_explicit
1203await_tasks_fastest_exception_im: Callable = await_tasks_fastest_exception_implicit
1204
1205
1206def wait_tasks_fastest_exception_explicit(
1207        i: Interface, 
1208        tasks: Union[Task, List[Task]], 
1209        terminate_tree: bool = False,
1210        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1211        timeout: Optional[float] = None, 
1212        terminate_other_tasks: Optional[bool] = None, 
1213        terminate_other_tasks_on_success: bool = False, 
1214        terminate_other_tasks_on_failure: bool = False, 
1215        terminate_other_tasks_on_timeout: bool = False, 
1216        wait_for_termination: bool = False, 
1217        ) -> List[Tuple[CoroID, Any, Exception]]:
1218    i(RunCoro, await_tasks_fastest_exception_explicit,
1219        tasks, 
1220        terminate_tree, 
1221        graceful_termination_settings, 
1222        timeout, 
1223        terminate_other_tasks, 
1224        terminate_other_tasks_on_success, 
1225        terminate_other_tasks_on_failure, 
1226        terminate_other_tasks_on_timeout, 
1227        wait_for_termination, 
1228        )
1229
1230
1231def wait_tasks_fastest_exception_implicit(
1232        tasks: Union[Task, List[Task]], 
1233        terminate_tree: bool = False,
1234        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1235        timeout: Optional[float] = None, 
1236        terminate_other_tasks: Optional[bool] = None, 
1237        terminate_other_tasks_on_success: bool = False, 
1238        terminate_other_tasks_on_failure: bool = False, 
1239        terminate_other_tasks_on_timeout: bool = False, 
1240        wait_for_termination: bool = False, 
1241        ) -> List[Tuple[CoroID, Any, Exception]]:
1242    i: Interface = current_interface()
1243    return wait_tasks_fastest_exception_explicit(
1244        i, 
1245        tasks, 
1246        terminate_tree, 
1247        graceful_termination_settings, 
1248        timeout, 
1249        terminate_other_tasks, 
1250        terminate_other_tasks_on_success, 
1251        terminate_other_tasks_on_failure, 
1252        terminate_other_tasks_on_timeout, 
1253        wait_for_termination, 
1254        )
1255
1256
1257wait_tasks_fastest_exception: Callable = wait_tasks_fastest_exception_explicit
1258wait_tasks_fastest_exception_im: Callable = wait_tasks_fastest_exception_implicit
1259
1260
1261# =================================================================================
1262
1263
1264async def await_tasks_fastest_custom_explicit(
1265        i: Interface, 
1266        tasks: Union[Task, List[Task]], 
1267        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1268        terminate_tree: bool = False,
1269        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1270        timeout: Optional[float] = None, 
1271        terminate_other_tasks: Optional[bool] = None, 
1272        terminate_other_tasks_on_success: bool = False, 
1273        terminate_other_tasks_on_failure: bool = False, 
1274        terminate_other_tasks_on_timeout: bool = False, 
1275        wait_for_termination: bool = False, 
1276        ) -> Tuple[CoroID, Any]:
1277    if isinstance(tasks, Task):
1278        tasks = [tasks]
1279    
1280    if not tasks:
1281        raise TasksListIsEmptyError
1282
1283    fastest_custom_wait_id: str = str(f'wait_task__fastest_custom__{uuid4()}')
1284    fastest_custom_info: FastestCustomInfo = FastestCustomInfo(fastest_custom_wait_id, tasks, result_criteria_handler)
1285
1286    if timeout is not None:
1287        # TODO: implement discard timer request when needed in order to improve performance
1288        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_custom_info.on_timeout)
1289
1290    for another_task in tasks:
1291        another_task.add_on_done_handler(FastestCustomOnDoneHandler(fastest_custom_info))
1292    
1293    await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_custom_wait_id))
1294    results: List[Tuple[CoroID, Any, Exception]] = fastest_custom_info.gather()
1295    if fastest_custom_info.timeout:
1296        termination_reason: TerminationReason = TerminationReason.timeout
1297    elif fastest_custom_info.failure:
1298        termination_reason = TerminationReason.failure
1299    else:
1300        termination_reason = TerminationReason.success
1301
1302    await aterminate_tasks_explicit(
1303        i, 
1304        tasks, 
1305        termination_reason, 
1306        terminate_tree, 
1307        graceful_termination_settings, 
1308        terminate_other_tasks, 
1309        terminate_other_tasks_on_success, 
1310        terminate_other_tasks_on_failure, 
1311        terminate_other_tasks_on_timeout, 
1312        wait_for_termination, 
1313        )
1314    if not results:
1315        return None, None
1316    
1317    coro_id, result, exception = results[0]
1318    if exception is not None:
1319        if isinstance(exception, TimeoutError) and (coro_id is not None):
1320            exception = SubTimeoutError()
1321        
1322        raise exception
1323    
1324    return coro_id, result
1325
1326
1327async def await_tasks_fastest_custom_implicit(
1328        tasks: Union[Task, List[Task]], 
1329        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1330        terminate_tree: bool = False,
1331        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1332        timeout: Optional[float] = None, 
1333        terminate_other_tasks: Optional[bool] = None, 
1334        terminate_other_tasks_on_success: bool = False, 
1335        terminate_other_tasks_on_failure: bool = False, 
1336        terminate_other_tasks_on_timeout: bool = False, 
1337        wait_for_termination: bool = False, 
1338        ) -> List[Tuple[CoroID, Any, Exception]]:
1339    i: Interface = current_interface()
1340    return await await_tasks_fastest_custom_explicit(
1341        i, 
1342        tasks, 
1343        result_criteria_handler, 
1344        terminate_tree, 
1345        graceful_termination_settings, 
1346        timeout, 
1347        terminate_other_tasks, 
1348        terminate_other_tasks_on_success, 
1349        terminate_other_tasks_on_failure, 
1350        terminate_other_tasks_on_timeout, 
1351        wait_for_termination, 
1352        )
1353
1354
1355await_tasks_fastest_custom: Callable = await_tasks_fastest_custom_explicit
1356await_tasks_fastest_custom_im: Callable = await_tasks_fastest_custom_implicit
1357
1358
1359def wait_tasks_fastest_custom_explicit(
1360        i: Interface, 
1361        tasks: Union[Task, List[Task]], 
1362        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1363        terminate_tree: bool = False,
1364        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1365        timeout: Optional[float] = None, 
1366        terminate_other_tasks: Optional[bool] = None, 
1367        terminate_other_tasks_on_success: bool = False, 
1368        terminate_other_tasks_on_failure: bool = False, 
1369        terminate_other_tasks_on_timeout: bool = False, 
1370        wait_for_termination: bool = False, 
1371        ) -> List[Tuple[CoroID, Any, Exception]]:
1372    i(RunCoro, await_tasks_fastest_custom_explicit,
1373        tasks, 
1374        result_criteria_handler, 
1375        terminate_tree, 
1376        graceful_termination_settings, 
1377        timeout, 
1378        terminate_other_tasks, 
1379        terminate_other_tasks_on_success, 
1380        terminate_other_tasks_on_failure, 
1381        terminate_other_tasks_on_timeout, 
1382        wait_for_termination, 
1383        )
1384
1385
1386def wait_tasks_fastest_custom_implicit(
1387        tasks: Union[Task, List[Task]], 
1388        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1389        terminate_tree: bool = False,
1390        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1391        timeout: Optional[float] = None, 
1392        terminate_other_tasks: Optional[bool] = None, 
1393        terminate_other_tasks_on_success: bool = False, 
1394        terminate_other_tasks_on_failure: bool = False, 
1395        terminate_other_tasks_on_timeout: bool = False, 
1396        wait_for_termination: bool = False, 
1397        ) -> List[Tuple[CoroID, Any, Exception]]:
1398    i: Interface = current_interface()
1399    return wait_tasks_fastest_custom_explicit(
1400        i, 
1401        tasks, 
1402        result_criteria_handler, 
1403        terminate_tree, 
1404        graceful_termination_settings, 
1405        timeout, 
1406        terminate_other_tasks, 
1407        terminate_other_tasks_on_success, 
1408        terminate_other_tasks_on_failure, 
1409        terminate_other_tasks_on_timeout, 
1410        wait_for_termination, 
1411        )
1412
1413
1414wait_tasks_fastest_custom: Callable = wait_tasks_fastest_custom_explicit
1415wait_tasks_fastest_custom_im: Callable = wait_tasks_fastest_custom_implicit
1416
1417
1418# =================================================================================
1419
1420
1421async def await_tasks_atomic_explicit(
1422        i: Interface, 
1423        tasks: Union[Task, List[Task]], 
1424        terminate_tree: bool = False,
1425        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1426        timeout: Optional[float] = None, 
1427        terminate_other_tasks: Optional[bool] = None, 
1428        terminate_other_tasks_on_success: bool = False, 
1429        terminate_other_tasks_on_failure: bool = False, 
1430        terminate_other_tasks_on_timeout: bool = False, 
1431        wait_for_termination: bool = False, 
1432        ):
1433    if terminate_other_tasks is not None:
1434        terminate_other_tasks_on_success = terminate_other_tasks
1435        terminate_other_tasks_on_failure = terminate_other_tasks
1436        terminate_other_tasks_on_timeout = terminate_other_tasks
1437
1438    if isinstance(tasks, Task):
1439        tasks = [tasks]
1440    
1441    if not tasks:
1442        raise TasksListIsEmptyError
1443
1444    atomic_wait_id: str = str(f'wait_task__atomic__{uuid4()}')
1445    atomic_info: AtomicInfo = AtomicInfo(atomic_wait_id, tasks)
1446
1447    if timeout is not None:
1448        # TODO: implement discard timer request when needed in order to improve performance
1449        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, atomic_info.on_timeout)
1450
1451    for another_task in tasks:
1452        another_task.add_on_done_handler(AtomicOnDoneHandler(atomic_info))
1453    
1454    await i(AsyncEventBus, AsyncEventBusRequest().wait(atomic_wait_id))
1455    results: List[Tuple[CoroID, Any, Exception]] = atomic_info.gather()
1456    if atomic_info.timeout:
1457        termination_reason: TerminationReason = TerminationReason.timeout
1458    elif atomic_info.failure:
1459        termination_reason = TerminationReason.failure
1460    else:
1461        termination_reason = TerminationReason.success
1462
1463    await aterminate_tasks_explicit(
1464        i, 
1465        tasks, 
1466        termination_reason, 
1467        terminate_tree, 
1468        graceful_termination_settings, 
1469        terminate_other_tasks, 
1470        terminate_other_tasks_on_success, 
1471        terminate_other_tasks_on_failure, 
1472        terminate_other_tasks_on_timeout, 
1473        wait_for_termination, 
1474        )
1475    return results
1476
1477
1478async def await_tasks_atomic_implicit(
1479        tasks: Union[Task, List[Task]], 
1480        terminate_tree: bool = False,
1481        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1482        timeout: Optional[float] = None, 
1483        terminate_other_tasks: Optional[bool] = None, 
1484        terminate_other_tasks_on_success: bool = False, 
1485        terminate_other_tasks_on_failure: bool = False, 
1486        terminate_other_tasks_on_timeout: bool = False, 
1487        wait_for_termination: bool = False, 
1488        ) -> List[Tuple[CoroID, Any, Exception]]:
1489    i: Interface = current_interface()
1490    return await await_tasks_atomic_explicit(
1491        i, 
1492        tasks, 
1493        terminate_tree, 
1494        graceful_termination_settings, 
1495        timeout, 
1496        terminate_other_tasks, 
1497        terminate_other_tasks_on_success, 
1498        terminate_other_tasks_on_failure, 
1499        terminate_other_tasks_on_timeout, 
1500        wait_for_termination, 
1501        )
1502
1503
1504await_tasks_atomic: Callable = await_tasks_atomic_explicit
1505await_tasks_atomic_im: Callable = await_tasks_atomic_implicit
1506
1507
1508def wait_tasks_atomic_explicit(
1509        i: Interface, 
1510        tasks: Union[Task, List[Task]], 
1511        terminate_tree: bool = False,
1512        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1513        timeout: Optional[float] = None, 
1514        terminate_other_tasks: Optional[bool] = None, 
1515        terminate_other_tasks_on_success: bool = False, 
1516        terminate_other_tasks_on_failure: bool = False, 
1517        terminate_other_tasks_on_timeout: bool = False, 
1518        wait_for_termination: bool = False, 
1519        ) -> List[Tuple[CoroID, Any, Exception]]:
1520    i(RunCoro, await_tasks_atomic_explicit,
1521        tasks, 
1522        terminate_tree, 
1523        graceful_termination_settings, 
1524        timeout, 
1525        terminate_other_tasks, 
1526        terminate_other_tasks_on_success, 
1527        terminate_other_tasks_on_failure, 
1528        terminate_other_tasks_on_timeout, 
1529        wait_for_termination, 
1530        )
1531
1532
1533def wait_tasks_atomic_implicit(
1534        tasks: Union[Task, List[Task]], 
1535        terminate_tree: bool = False,
1536        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1537        timeout: Optional[float] = None, 
1538        terminate_other_tasks: Optional[bool] = None, 
1539        terminate_other_tasks_on_success: bool = False, 
1540        terminate_other_tasks_on_failure: bool = False, 
1541        terminate_other_tasks_on_timeout: bool = False, 
1542        wait_for_termination: bool = False, 
1543        ) -> List[Tuple[CoroID, Any, Exception]]:
1544    i: Interface = current_interface()
1545    return wait_tasks_atomic_explicit(
1546        i, 
1547        tasks, 
1548        terminate_tree, 
1549        graceful_termination_settings, 
1550        timeout, 
1551        terminate_other_tasks, 
1552        terminate_other_tasks_on_success, 
1553        terminate_other_tasks_on_failure, 
1554        terminate_other_tasks_on_timeout, 
1555        wait_for_termination, 
1556        )
1557
1558
1559wait_tasks_atomic: Callable = wait_tasks_atomic_explicit
1560wait_tasks_atomic_im: Callable = wait_tasks_atomic_implicit
1561
1562
1563# =================================================================================
1564
1565
1566async def await_tasks_atomic_custom_explicit(
1567        i: Interface, 
1568        tasks: Union[Task, List[Task]], 
1569        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1570        first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 
1571        terminate_tree: bool = False,
1572        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1573        timeout: Optional[float] = None, 
1574        terminate_other_tasks: Optional[bool] = None, 
1575        terminate_other_tasks_on_success: bool = False, 
1576        terminate_other_tasks_on_failure: bool = False, 
1577        terminate_other_tasks_on_timeout: bool = False, 
1578        wait_for_termination: bool = False, 
1579        ):
1580    if terminate_other_tasks is not None:
1581        terminate_other_tasks_on_success = terminate_other_tasks
1582        terminate_other_tasks_on_failure = terminate_other_tasks
1583        terminate_other_tasks_on_timeout = terminate_other_tasks
1584
1585    if isinstance(tasks, Task):
1586        tasks = [tasks]
1587    
1588    if not tasks:
1589        raise TasksListIsEmptyError
1590
1591    atomic_custom_wait_id: str = str(f'wait_task__atomic_custom__{uuid4()}')
1592    atomic_custom_info: AtomicCustomInfo = AtomicCustomInfo(
1593        atomic_custom_wait_id, 
1594        tasks, 
1595        result_criteria_handler, 
1596        first_failed_result_formatter, 
1597        )
1598
1599    if timeout is not None:
1600        # TODO: implement discard timer request when needed in order to improve performance
1601        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, atomic_custom_info.on_timeout)
1602
1603    for another_task in tasks:
1604        another_task.add_on_done_handler(AtomicCustomOnDoneHandler(atomic_custom_info))
1605    
1606    await i(AsyncEventBus, AsyncEventBusRequest().wait(atomic_custom_wait_id))
1607    results: List[Tuple[CoroID, Any, Exception]] = atomic_custom_info.gather()
1608    if atomic_custom_info.timeout:
1609        termination_reason: TerminationReason = TerminationReason.timeout
1610    elif atomic_custom_info.failure:
1611        termination_reason = TerminationReason.failure
1612    else:
1613        termination_reason = TerminationReason.success
1614
1615    await aterminate_tasks_explicit(
1616        i, 
1617        tasks, 
1618        termination_reason, 
1619        terminate_tree, 
1620        graceful_termination_settings, 
1621        terminate_other_tasks, 
1622        terminate_other_tasks_on_success, 
1623        terminate_other_tasks_on_failure, 
1624        terminate_other_tasks_on_timeout, 
1625        wait_for_termination, 
1626        )
1627    return results
1628
1629
1630async def await_tasks_atomic_custom_implicit(
1631        tasks: Union[Task, List[Task]], 
1632        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1633        first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 
1634        terminate_tree: bool = False,
1635        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1636        timeout: Optional[float] = None, 
1637        terminate_other_tasks: Optional[bool] = None, 
1638        terminate_other_tasks_on_success: bool = False, 
1639        terminate_other_tasks_on_failure: bool = False, 
1640        terminate_other_tasks_on_timeout: bool = False, 
1641        wait_for_termination: bool = False, 
1642        ) -> List[Tuple[CoroID, Any, Exception]]:
1643    i: Interface = current_interface()
1644    return await await_tasks_atomic_custom_explicit(
1645        i, 
1646        tasks, 
1647        result_criteria_handler, 
1648        first_failed_result_formatter, 
1649        terminate_tree, 
1650        graceful_termination_settings, 
1651        timeout, 
1652        terminate_other_tasks, 
1653        terminate_other_tasks_on_success, 
1654        terminate_other_tasks_on_failure, 
1655        terminate_other_tasks_on_timeout, 
1656        wait_for_termination, 
1657        )
1658
1659
1660await_tasks_atomic_custom: Callable = await_tasks_atomic_custom_explicit
1661await_tasks_atomic_custom_im: Callable = await_tasks_atomic_custom_implicit
1662
1663
1664def wait_tasks_atomic_custom_explicit(
1665        i: Interface, 
1666        tasks: Union[Task, List[Task]], 
1667        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1668        first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 
1669        terminate_tree: bool = False,
1670        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1671        timeout: Optional[float] = None, 
1672        terminate_other_tasks: Optional[bool] = None, 
1673        terminate_other_tasks_on_success: bool = False, 
1674        terminate_other_tasks_on_failure: bool = False, 
1675        terminate_other_tasks_on_timeout: bool = False, 
1676        wait_for_termination: bool = False, 
1677        ) -> List[Tuple[CoroID, Any, Exception]]:
1678    i(RunCoro, await_tasks_atomic_custom_explicit,
1679        tasks, 
1680        result_criteria_handler, 
1681        first_failed_result_formatter, 
1682        terminate_tree, 
1683        graceful_termination_settings, 
1684        timeout, 
1685        terminate_other_tasks, 
1686        terminate_other_tasks_on_success, 
1687        terminate_other_tasks_on_failure, 
1688        terminate_other_tasks_on_timeout, 
1689        wait_for_termination, 
1690        )
1691
1692
1693def wait_tasks_atomic_custom_implicit(
1694        tasks: Union[Task, List[Task]], 
1695        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1696        first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 
1697        terminate_tree: bool = False,
1698        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1699        timeout: Optional[float] = None, 
1700        terminate_other_tasks: Optional[bool] = None, 
1701        terminate_other_tasks_on_success: bool = False, 
1702        terminate_other_tasks_on_failure: bool = False, 
1703        terminate_other_tasks_on_timeout: bool = False, 
1704        wait_for_termination: bool = False, 
1705        ) -> List[Tuple[CoroID, Any, Exception]]:
1706    i: Interface = current_interface()
1707    return wait_tasks_atomic_custom_explicit(
1708        i, 
1709        tasks, 
1710        result_criteria_handler, 
1711        first_failed_result_formatter, 
1712        terminate_tree, 
1713        graceful_termination_settings, 
1714        timeout, 
1715        terminate_other_tasks, 
1716        terminate_other_tasks_on_success, 
1717        terminate_other_tasks_on_failure, 
1718        terminate_other_tasks_on_timeout, 
1719        wait_for_termination, 
1720        )
1721
1722
1723wait_tasks_atomic_custom: Callable = wait_tasks_atomic_custom_explicit
1724wait_tasks_atomic_custom_im: Callable = wait_tasks_atomic_custom_implicit
1725
1726
1727# =================================================================================
1728
1729
1730def wait_task_explicit(i: Interface, task: Union[Task, List[Task]]):
1731    wait_id: str = str(f'wait_task__{uuid4()}')
1732    def on_done_handler():
1733        put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high)
1734
1735    task.add_on_done_handler(on_done_handler)
1736    i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id))
1737    return task.result
1738
1739
1740def wait_task_implicit(task: Union[Task, List[Task]]):
1741    i: Interface = current_interface()
1742    wait_id: str = str(f'wait_task__{uuid4()}')
1743    def on_done_handler():
1744        put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high)
1745
1746    task.add_on_done_handler(on_done_handler)
1747    i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id))
1748    return task.result
1749
1750
1751wait_task = wait_task_explicit
1752wait_task_im = wait_task_implicit
1753
1754
1755async def await_task_explicit(i: Interface, task: Union[Task, List[Task]]):
1756    wait_id: str = str(f'wait_task__{uuid4()}')
1757    def on_done_handler():
1758        put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high)
1759
1760    task.add_on_done_handler(on_done_handler)
1761    await i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id))
1762    return task.result
1763
1764
1765async def await_task_implicit(task: Union[Task, List[Task]]):
1766    i: Interface = current_interface()
1767    wait_id: str = str(f'wait_task__{uuid4()}')
1768    def on_done_handler():
1769        put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high)
1770
1771    task.add_on_done_handler(on_done_handler)
1772    await i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id))
1773    return task.result
1774
1775
1776await_task = await_task_explicit
1777await_task_im = await_task_implicit
class WaitType(enum.IntEnum):
155class WaitType(IntEnum):
156    normal = 0
157    fastest = 1
158    fastest_successful = 2
159    fastest_exception = 3
160    fastest_custom = 4
161    atomic = 5
162    atomic_custom = 5

An enumeration.

normal = <WaitType.normal: 0>
fastest = <WaitType.fastest: 1>
fastest_successful = <WaitType.fastest_successful: 2>
fastest_exception = <WaitType.fastest_exception: 3>
fastest_custom = <WaitType.fastest_custom: 4>
atomic = <WaitType.atomic: 5>
atomic_custom = <WaitType.atomic: 5>
Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
to_bytes
from_bytes
as_integer_ratio
real
imag
numerator
denominator
class TasksListIsEmptyError(builtins.Exception):
165class TasksListIsEmptyError(Exception):
166    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class IsNotDoneYetError(builtins.Exception):
169class IsNotDoneYetError(Exception):
170    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class NormalOnDoneHandler:
173class NormalOnDoneHandler:
174    def __init__(self, normal_info: 'NormalInfo') -> None:
175        self.normal_info: 'NormalInfo' = normal_info
176    
177    def __call__(self, task: Task) -> Any:
178        self.normal_info.add_result(task.coro_id, task._result, task._exception)
NormalOnDoneHandler( normal_info: NormalInfo)
174    def __init__(self, normal_info: 'NormalInfo') -> None:
175        self.normal_info: 'NormalInfo' = normal_info
normal_info: NormalInfo
class NormalInfo:
181class NormalInfo:
182    def __init__(self, wait_done_id: str, tasks: List[Task]) -> None:
183        self.tasks: List[Task] = tasks
184        self.coroutines_num: int = len(tasks)
185        self.results: List[Tuple[CoroID, Any, Exception]] = list()
186        self.wait_done_id: str = wait_done_id
187        self.ignore: bool = False
188        self._done: bool = False
189        self._timeout: bool = False
190        self._failure: bool = False
191    
192    def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None:
193        if self.ignore:
194            return
195        
196        self.results.append((coro_id, result, exception))
197        if len(self.results) >= self.coroutines_num:
198            self.done = True
199    
200    @property
201    def failure(self) -> bool:
202        return self._failure
203    
204    @failure.setter
205    def failure(self, value: bool) -> None:
206        self._failure = value
207    
208    @property
209    def done(self) -> bool:
210        return self._done
211    
212    @done.setter
213    def done(self, value: bool) -> None:
214        if self._done:
215            return
216        
217        self._done = value
218        if value:
219            self.ignore = True
220            put_request_to_service(AsyncEventBus, self.wait_done_id, None, CoroPriority.high)
221    
222    @property
223    def timeout(self) -> bool:
224        return self._timeout
225    
226    def on_timeout(self) -> None:
227        self._timeout = True
228        self.done = True
229    
230    def gather(self) -> List[Tuple[CoroID, Any, Exception]]:
231        if not self.done:
232            raise IsNotDoneYetError
233        
234        return self.results
NormalInfo( wait_done_id: str, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task])
182    def __init__(self, wait_done_id: str, tasks: List[Task]) -> None:
183        self.tasks: List[Task] = tasks
184        self.coroutines_num: int = len(tasks)
185        self.results: List[Tuple[CoroID, Any, Exception]] = list()
186        self.wait_done_id: str = wait_done_id
187        self.ignore: bool = False
188        self._done: bool = False
189        self._timeout: bool = False
190        self._failure: bool = False
tasks: List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]
coroutines_num: int
results: List[Tuple[int, Any, Exception]]
wait_done_id: str
ignore: bool
def add_result(self, coro_id: int, result: typing.Any, exception: Exception) -> None:
192    def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None:
193        if self.ignore:
194            return
195        
196        self.results.append((coro_id, result, exception))
197        if len(self.results) >= self.coroutines_num:
198            self.done = True
failure: bool
200    @property
201    def failure(self) -> bool:
202        return self._failure
done: bool
208    @property
209    def done(self) -> bool:
210        return self._done
timeout: bool
222    @property
223    def timeout(self) -> bool:
224        return self._timeout
def on_timeout(self) -> None:
226    def on_timeout(self) -> None:
227        self._timeout = True
228        self.done = True
def gather(self) -> List[Tuple[int, Any, Exception]]:
230    def gather(self) -> List[Tuple[CoroID, Any, Exception]]:
231        if not self.done:
232            raise IsNotDoneYetError
233        
234        return self.results
class FastestOnDoneHandler:
240class FastestOnDoneHandler:
241    def __init__(self, fastest_info: 'FastestInfo') -> None:
242        self.fastest_info: 'FastestInfo' = fastest_info
243    
244    def __call__(self, task: Task) -> Any:
245        self.fastest_info.add_result((task.coro_id, task._result, task._exception))
FastestOnDoneHandler( fastest_info: FastestInfo)
241    def __init__(self, fastest_info: 'FastestInfo') -> None:
242        self.fastest_info: 'FastestInfo' = fastest_info
fastest_info: FastestInfo
class FastestInfo(NormalInfo):
248class FastestInfo(NormalInfo):
249    def __init__(self, wait_done_id: str, tasks: List[Task]) -> None:
250        super().__init__(wait_done_id, tasks)
251        self.coroutines_num: int = 1
FastestInfo( wait_done_id: str, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task])
249    def __init__(self, wait_done_id: str, tasks: List[Task]) -> None:
250        super().__init__(wait_done_id, tasks)
251        self.coroutines_num: int = 1
coroutines_num: int
class FastestSuccessfulOnDoneHandler:
257class FastestSuccessfulOnDoneHandler:
258    def __init__(self, fastest_successful_info: 'FastestSuccessfulInfo') -> None:
259        self.fastest_successful_info: 'FastestSuccessfulInfo' = fastest_successful_info
260    
261    def __call__(self, task: Task) -> Any:
262        self.fastest_successful_info.add_result((task.coro_id, task._result, task._exception))
FastestSuccessfulOnDoneHandler( fastest_successful_info: FastestSuccessfulInfo)
258    def __init__(self, fastest_successful_info: 'FastestSuccessfulInfo') -> None:
259        self.fastest_successful_info: 'FastestSuccessfulInfo' = fastest_successful_info
fastest_successful_info: FastestSuccessfulInfo
class FastestSuccessfulInfo(NormalInfo):
265class FastestSuccessfulInfo(NormalInfo):
266    def __init__(self, wait_done_id: str, tasks: List[Task]) -> None:
267        super().__init__(wait_done_id, tasks)
268        self.fastest_successful_result: Optional[Tuple[CoroID, Any, Exception]] = None
269    
270    def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None:
271        if self.ignore:
272            return
273        
274        self.results.append((coro_id, result, exception))
275        if exception is None:
276            self.fastest_successful_result = (coro_id, result, exception)
277            self.done = True
278        elif (len(self.results) >= self.coroutines_num):
279            self.done = True
280    
281    def gather(self) -> List[Tuple[CoroID, Any, Exception]]:
282        if not self.done:
283            raise IsNotDoneYetError
284        
285        if self.fastest_successful_result is None:
286            self.failure = True
287        
288        return [self.fastest_successful_result]
FastestSuccessfulInfo( wait_done_id: str, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task])
266    def __init__(self, wait_done_id: str, tasks: List[Task]) -> None:
267        super().__init__(wait_done_id, tasks)
268        self.fastest_successful_result: Optional[Tuple[CoroID, Any, Exception]] = None
fastest_successful_result: Union[Tuple[int, Any, Exception], NoneType]
def add_result(self, coro_id: int, result: typing.Any, exception: Exception) -> None:
270    def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None:
271        if self.ignore:
272            return
273        
274        self.results.append((coro_id, result, exception))
275        if exception is None:
276            self.fastest_successful_result = (coro_id, result, exception)
277            self.done = True
278        elif (len(self.results) >= self.coroutines_num):
279            self.done = True
def gather(self) -> List[Tuple[int, Any, Exception]]:
281    def gather(self) -> List[Tuple[CoroID, Any, Exception]]:
282        if not self.done:
283            raise IsNotDoneYetError
284        
285        if self.fastest_successful_result is None:
286            self.failure = True
287        
288        return [self.fastest_successful_result]
class FastestExceptionOnDoneHandler:
294class FastestExceptionOnDoneHandler:
295    def __init__(self, fastest_exception_info: 'FastestExceptionInfo') -> None:
296        self.fastest_exception_info: 'FastestExceptionInfo' = fastest_exception_info
297    
298    def __call__(self, task: Task) -> Any:
299        self.fastest_exception_info.add_result((task.coro_id, task._result, task._exception))
FastestExceptionOnDoneHandler( fastest_exception_info: FastestExceptionInfo)
295    def __init__(self, fastest_exception_info: 'FastestExceptionInfo') -> None:
296        self.fastest_exception_info: 'FastestExceptionInfo' = fastest_exception_info
fastest_exception_info: FastestExceptionInfo
class FastestExceptionInfo(NormalInfo):
302class FastestExceptionInfo(NormalInfo):
303    def __init__(self, wait_done_id: str, tasks: List[Task]) -> None:
304        super().__init__(wait_done_id, tasks)
305        self.fastest_exception_result: Optional[Tuple[CoroID, Any, Exception]] = None
306    
307    def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None:
308        if self.ignore:
309            return
310        
311        self.results.append((coro_id, result, exception))
312        if exception is not None:
313            self.fastest_exception_result = (coro_id, result, exception)
314            self.done = True
315        elif (len(self.results) >= self.coroutines_num):
316            self.done = True
317    
318    def gather(self) -> List[Tuple[CoroID, Any, Exception]]:
319        if not self.done:
320            raise IsNotDoneYetError
321        
322        if self.fastest_exception_result is None:
323            self.failure = True
324        
325        return [self.fastest_exception_result]
FastestExceptionInfo( wait_done_id: str, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task])
303    def __init__(self, wait_done_id: str, tasks: List[Task]) -> None:
304        super().__init__(wait_done_id, tasks)
305        self.fastest_exception_result: Optional[Tuple[CoroID, Any, Exception]] = None
fastest_exception_result: Union[Tuple[int, Any, Exception], NoneType]
def add_result(self, coro_id: int, result: typing.Any, exception: Exception) -> None:
307    def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None:
308        if self.ignore:
309            return
310        
311        self.results.append((coro_id, result, exception))
312        if exception is not None:
313            self.fastest_exception_result = (coro_id, result, exception)
314            self.done = True
315        elif (len(self.results) >= self.coroutines_num):
316            self.done = True
def gather(self) -> List[Tuple[int, Any, Exception]]:
318    def gather(self) -> List[Tuple[CoroID, Any, Exception]]:
319        if not self.done:
320            raise IsNotDoneYetError
321        
322        if self.fastest_exception_result is None:
323            self.failure = True
324        
325        return [self.fastest_exception_result]
class FastestCustomOnDoneHandler:
331class FastestCustomOnDoneHandler:
332    def __init__(self, fastest_custom_info: 'FastestCustomInfo') -> None:
333        self.fastest_custom_info: 'FastestCustomInfo' = fastest_custom_info
334    
335    def __call__(self, task: Task) -> Any:
336        self.fastest_custom_info.add_result((task.coro_id, task._result, task._exception))
FastestCustomOnDoneHandler( fastest_custom_info: FastestCustomInfo)
332    def __init__(self, fastest_custom_info: 'FastestCustomInfo') -> None:
333        self.fastest_custom_info: 'FastestCustomInfo' = fastest_custom_info
fastest_custom_info: FastestCustomInfo
class FastestCustomInfo(NormalInfo):
339class FastestCustomInfo(NormalInfo):
340    def __init__(self, wait_done_id: str, tasks: List[Task], result_criteria_handler: Callable[[CoroID, Any, Exception], bool]) -> None:
341        super().__init__(wait_done_id, tasks)
342        self.result_criteria_handler: Callable[[CoroID, Any, Exception], bool] = result_criteria_handler
343        self.fastest_custom_result: Optional[Tuple[CoroID, Any, Exception]] = None
344    
345    def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None:
346        if self.ignore:
347            return
348        
349        self.results.append((coro_id, result, exception))
350        if self.result_criteria_handler(coro_id, result, exception):
351            self.fastest_custom_result = (coro_id, result, exception)
352            self.done = True
353        elif (len(self.results) >= self.coroutines_num):
354            self.done = True
355    
356    def gather(self) -> List[Tuple[CoroID, Any, Exception]]:
357        if not self.done:
358            raise IsNotDoneYetError
359        
360        if self.fastest_custom_result is None:
361            self.failure = True
362        
363        return [self.fastest_custom_result]
FastestCustomInfo( wait_done_id: str, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool])
340    def __init__(self, wait_done_id: str, tasks: List[Task], result_criteria_handler: Callable[[CoroID, Any, Exception], bool]) -> None:
341        super().__init__(wait_done_id, tasks)
342        self.result_criteria_handler: Callable[[CoroID, Any, Exception], bool] = result_criteria_handler
343        self.fastest_custom_result: Optional[Tuple[CoroID, Any, Exception]] = None
result_criteria_handler: Callable[[int, Any, Exception], bool]
fastest_custom_result: Union[Tuple[int, Any, Exception], NoneType]
def add_result(self, coro_id: int, result: typing.Any, exception: Exception) -> None:
345    def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None:
346        if self.ignore:
347            return
348        
349        self.results.append((coro_id, result, exception))
350        if self.result_criteria_handler(coro_id, result, exception):
351            self.fastest_custom_result = (coro_id, result, exception)
352            self.done = True
353        elif (len(self.results) >= self.coroutines_num):
354            self.done = True
def gather(self) -> List[Tuple[int, Any, Exception]]:
356    def gather(self) -> List[Tuple[CoroID, Any, Exception]]:
357        if not self.done:
358            raise IsNotDoneYetError
359        
360        if self.fastest_custom_result is None:
361            self.failure = True
362        
363        return [self.fastest_custom_result]
class AtomicOnDoneHandler:
369class AtomicOnDoneHandler:
370    def __init__(self, atomic_info: 'AtomicInfo') -> None:
371        self.atomic_info: 'AtomicInfo' = atomic_info
372    
373    def __call__(self, task: Task) -> Any:
374        self.atomic_info.add_result(task.coro_id, task._result, task._exception)
AtomicOnDoneHandler( atomic_info: AtomicInfo)
370    def __init__(self, atomic_info: 'AtomicInfo') -> None:
371        self.atomic_info: 'AtomicInfo' = atomic_info
atomic_info: AtomicInfo
class AtomicInfo(NormalInfo):
377class AtomicInfo(NormalInfo):
378    def __init__(self, wait_done_id: str, tasks: List[Task], include_first_failure: bool = True) -> None:
379        super().__init__(wait_done_id, tasks)
380        self.include_first_failure: bool = include_first_failure
381    
382    def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None:
383        if self.ignore:
384            return
385        
386        if exception is not None:
387            if self.include_first_failure:
388                self.results.append((coro_id, result, exception))
389            
390            self.failure = True
391            self.done = True
392        else:
393            self.results.append((coro_id, result, exception))
394        
395        if len(self.results) >= self.coroutines_num:
396            self.done = True
397    
398    def gather(self) -> List[Tuple[CoroID, Any, Exception]]:
399        if not self.done:
400            raise IsNotDoneYetError
401        
402        normalized_results: List[Tuple[CoroID, Any, Exception]] = list()
403        provided_results: Dict[CoroID, Tuple[Any, Exception]] = dict()
404        for result_info in self.results:
405            if result_info is None:
406                continue
407
408            coro_id, result, exception = result_info
409            provided_results[coro_id] = (result, exception)
410        
411        for task in self.tasks:
412            coro_id = task.coro_id
413            if coro_id in provided_results:
414                result, exception = provided_results[coro_id]
415                normalized_results.append((coro_id, result, exception))
416            else:
417                normalized_results.append(None)
418        
419        return normalized_results
AtomicInfo( wait_done_id: str, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], include_first_failure: bool = True)
378    def __init__(self, wait_done_id: str, tasks: List[Task], include_first_failure: bool = True) -> None:
379        super().__init__(wait_done_id, tasks)
380        self.include_first_failure: bool = include_first_failure
include_first_failure: bool
def add_result(self, coro_id: int, result: typing.Any, exception: Exception) -> None:
382    def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None:
383        if self.ignore:
384            return
385        
386        if exception is not None:
387            if self.include_first_failure:
388                self.results.append((coro_id, result, exception))
389            
390            self.failure = True
391            self.done = True
392        else:
393            self.results.append((coro_id, result, exception))
394        
395        if len(self.results) >= self.coroutines_num:
396            self.done = True
def gather(self) -> List[Tuple[int, Any, Exception]]:
398    def gather(self) -> List[Tuple[CoroID, Any, Exception]]:
399        if not self.done:
400            raise IsNotDoneYetError
401        
402        normalized_results: List[Tuple[CoroID, Any, Exception]] = list()
403        provided_results: Dict[CoroID, Tuple[Any, Exception]] = dict()
404        for result_info in self.results:
405            if result_info is None:
406                continue
407
408            coro_id, result, exception = result_info
409            provided_results[coro_id] = (result, exception)
410        
411        for task in self.tasks:
412            coro_id = task.coro_id
413            if coro_id in provided_results:
414                result, exception = provided_results[coro_id]
415                normalized_results.append((coro_id, result, exception))
416            else:
417                normalized_results.append(None)
418        
419        return normalized_results
class AtomicCustomOnDoneHandler:
425class AtomicCustomOnDoneHandler:
426    def __init__(self, atomic_custom_info: 'AtomicCustomInfo') -> None:
427        self.atomic_custom_info: 'AtomicCustomInfo' = atomic_custom_info
428    
429    def __call__(self, task: Task) -> Any:
430        self.atomic_custom_info.add_result(task.coro_id, task._result, task._exception)
AtomicCustomOnDoneHandler( atomic_custom_info: AtomicCustomInfo)
426    def __init__(self, atomic_custom_info: 'AtomicCustomInfo') -> None:
427        self.atomic_custom_info: 'AtomicCustomInfo' = atomic_custom_info
atomic_custom_info: AtomicCustomInfo
def success_result_criteria_handler(coro_id: int, result: typing.Any, exception: Exception) -> bool:
433def success_result_criteria_handler(coro_id: CoroID, result: Any, exception: Exception) -> bool:
434    return exception is None
def exception_result_criteria_handler(coro_id: int, result: typing.Any, exception: Exception) -> bool:
437def exception_result_criteria_handler(coro_id: CoroID, result: Any, exception: Exception) -> bool:
438    return exception is not None
def same_first_failed_result_formatter(coro_id: int, result: typing.Any, exception: Exception) -> Any:
441def same_first_failed_result_formatter(coro_id: CoroID, result: Any, exception: Exception) -> Any:
442    return (coro_id, result, exception)
def none_first_failed_result_formatter(coro_id: int, result: typing.Any, exception: Exception) -> Any:
445def none_first_failed_result_formatter(coro_id: CoroID, result: Any, exception: Exception) -> Any:
446    return None
class AtomicCustomInfo(NormalInfo):
449class AtomicCustomInfo(NormalInfo):
450    def __init__(
451            self, 
452            wait_done_id: str, 
453            tasks: List[Task], 
454            result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
455            first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None,
456            ) -> None:
457        super().__init__(wait_done_id, tasks)
458        self.result_criteria_handler: Callable[[CoroID, Any, Exception], bool] = result_criteria_handler
459        self.first_failed_result_formatter: Callable[[CoroID, Any, Exception], Any] = \
460            none_first_failed_result_formatter if first_failed_result_formatter is None else first_failed_result_formatter
461    
462    def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None:
463        if self.ignore:
464            return
465        
466        self.results.append((coro_id, result, exception))
467        if (len(self.results) >= self.coroutines_num) or (exception is not None):
468            self.done = True
469        
470        if not self.result_criteria_handler(coro_id, result, exception):
471            result = self.first_failed_result_formatter(coro_id, result, exception)
472            self.failure = True
473            self.done = True
474        else:
475            result = (coro_id, result, exception)
476        
477        self.results.append(result)
478        if len(self.results) >= self.coroutines_num:
479            self.done = True
480    
481    def gather(self) -> List[Tuple[CoroID, Any, Exception]]:
482        if not self.done:
483            raise IsNotDoneYetError
484        
485        normalized_results: List[Tuple[CoroID, Any, Exception]] = list()
486        provided_results: Dict[CoroID, Tuple[Any, Exception]] = dict()
487        for result_info in self.results:
488            if result_info is None:
489                continue
490
491            coro_id, result, exception = result_info
492            provided_results[coro_id] = (result, exception)
493        
494        for task in self.tasks:
495            coro_id = task.coro_id
496            if coro_id in provided_results:
497                result, exception = provided_results[coro_id]
498                normalized_results.append((coro_id, result, exception))
499            else:
500                normalized_results.append(None)
501        
502        return normalized_results
AtomicCustomInfo( wait_done_id: str, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], first_failed_result_formatter: typing.Union[typing.Callable[[int, typing.Any, Exception], typing.Any], NoneType] = None)
450    def __init__(
451            self, 
452            wait_done_id: str, 
453            tasks: List[Task], 
454            result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
455            first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None,
456            ) -> None:
457        super().__init__(wait_done_id, tasks)
458        self.result_criteria_handler: Callable[[CoroID, Any, Exception], bool] = result_criteria_handler
459        self.first_failed_result_formatter: Callable[[CoroID, Any, Exception], Any] = \
460            none_first_failed_result_formatter if first_failed_result_formatter is None else first_failed_result_formatter
result_criteria_handler: Callable[[int, Any, Exception], bool]
first_failed_result_formatter: Callable[[int, Any, Exception], Any]
def add_result(self, coro_id: int, result: typing.Any, exception: Exception) -> None:
462    def add_result(self, coro_id: CoroID, result: Any, exception: Exception) -> None:
463        if self.ignore:
464            return
465        
466        self.results.append((coro_id, result, exception))
467        if (len(self.results) >= self.coroutines_num) or (exception is not None):
468            self.done = True
469        
470        if not self.result_criteria_handler(coro_id, result, exception):
471            result = self.first_failed_result_formatter(coro_id, result, exception)
472            self.failure = True
473            self.done = True
474        else:
475            result = (coro_id, result, exception)
476        
477        self.results.append(result)
478        if len(self.results) >= self.coroutines_num:
479            self.done = True
def gather(self) -> List[Tuple[int, Any, Exception]]:
481    def gather(self) -> List[Tuple[CoroID, Any, Exception]]:
482        if not self.done:
483            raise IsNotDoneYetError
484        
485        normalized_results: List[Tuple[CoroID, Any, Exception]] = list()
486        provided_results: Dict[CoroID, Tuple[Any, Exception]] = dict()
487        for result_info in self.results:
488            if result_info is None:
489                continue
490
491            coro_id, result, exception = result_info
492            provided_results[coro_id] = (result, exception)
493        
494        for task in self.tasks:
495            coro_id = task.coro_id
496            if coro_id in provided_results:
497                result, exception = provided_results[coro_id]
498                normalized_results.append((coro_id, result, exception))
499            else:
500                normalized_results.append(None)
501        
502        return normalized_results
class GracefulTerminationSettings:
508class GracefulTerminationSettings:
509    def __init__(self, 
510                 phase_time_limit: Optional[float], 
511                 ex_type: Type[Exception] = None, ex_value: Exception = None, ex_traceback: Any = None, 
512                 first_phase_is_wait: bool = True,
513                 last_phase_is_kill: bool = True, 
514                 ) -> None:
515        self.phase_time_limit: Optional[float] = phase_time_limit
516        self.ex_type: Type[Exception] = ex_type
517        self.ex_value: Exception = ex_value
518        self.ex_traceback: Any = ex_traceback
519        self.first_phase_is_wait: bool = first_phase_is_wait
520        self.last_phase_is_kill: bool = last_phase_is_kill
GracefulTerminationSettings( phase_time_limit: typing.Union[float, NoneType], ex_type: typing.Type[Exception] = None, ex_value: Exception = None, ex_traceback: typing.Any = None, first_phase_is_wait: bool = True, last_phase_is_kill: bool = True)
509    def __init__(self, 
510                 phase_time_limit: Optional[float], 
511                 ex_type: Type[Exception] = None, ex_value: Exception = None, ex_traceback: Any = None, 
512                 first_phase_is_wait: bool = True,
513                 last_phase_is_kill: bool = True, 
514                 ) -> None:
515        self.phase_time_limit: Optional[float] = phase_time_limit
516        self.ex_type: Type[Exception] = ex_type
517        self.ex_value: Exception = ex_value
518        self.ex_traceback: Any = ex_traceback
519        self.first_phase_is_wait: bool = first_phase_is_wait
520        self.last_phase_is_kill: bool = last_phase_is_kill
phase_time_limit: Union[float, NoneType]
ex_type: Type[Exception]
ex_value: Exception
ex_traceback: Any
first_phase_is_wait: bool
last_phase_is_kill: bool
class TerminationReason(enum.IntEnum):
523class TerminationReason(IntEnum):
524    success = 0
525    failure = 1
526    timeout = 2

An enumeration.

success = <TerminationReason.success: 0>
failure = <TerminationReason.failure: 1>
timeout = <TerminationReason.timeout: 2>
Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
to_bytes
from_bytes
as_integer_ratio
real
imag
numerator
denominator
async def atask_graceful_destroyer( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, task: cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None) -> None:
529async def atask_graceful_destroyer(
530        i: Interface, 
531        task: Task, 
532        tree: bool = False,
533        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
534        ) -> None:
535    await agraceful_coro_destroyer(
536        i, 
537        graceful_termination_settings.phase_time_limit, 
538        task.coro_id, 
539        graceful_termination_settings.ex_type, 
540        graceful_termination_settings.ex_value, 
541        graceful_termination_settings.ex_traceback, 
542        tree, 
543        graceful_termination_settings.first_phase_is_wait, 
544        graceful_termination_settings.last_phase_is_kill, 
545        )
async def aterminate_tasks_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], termination_reason: TerminationReason, tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> bool:
548async def aterminate_tasks_explicit(
549        i: Interface, 
550        tasks: List[Task], 
551        termination_reason: TerminationReason,
552        tree: bool = False,
553        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
554        terminate_other_tasks: Optional[bool] = None, 
555        terminate_other_tasks_on_success: bool = False, 
556        terminate_other_tasks_on_failure: bool = False, 
557        terminate_other_tasks_on_timeout: bool = False, 
558        wait_for_termination: bool = False, 
559        ) -> bool:
560    if not tasks:
561        return False
562    
563    if terminate_other_tasks is not None:
564        terminate_other_tasks_on_success = terminate_other_tasks
565        terminate_other_tasks_on_failure = terminate_other_tasks
566        terminate_other_tasks_on_timeout = terminate_other_tasks
567
568    if ((TerminationReason.success == termination_reason) and terminate_other_tasks_on_success) or \
569        ((TerminationReason.failure == termination_reason) and terminate_other_tasks_on_failure) or \
570            ((TerminationReason.timeout == termination_reason) and terminate_other_tasks_on_timeout):
571        if graceful_termination_settings is None:
572            termination_tasks_params: List[KillSingleCoroParams] = list([KillSingleCoroParams(task.coro_id, tree) for task in tasks if not task.done])
573            if not termination_tasks_params:
574                return False
575            
576            i(KillCoroList, termination_tasks)
577        else:
578            termination_tasks_params: List[PSCP] = [PSCP(atask_graceful_destroyer, task, tree, graceful_termination_settings) for task in tasks if not task.done]
579            if not termination_tasks_params:
580                return False
581            
582            termination_tasks: List[Task] = await i(PutCoroList, termination_tasks_params, True)
583            if wait_for_termination:
584                await await_tasks_explicit(i, termination_tasks, False, None, None, False, False, False, False)
585    
586    return True
async def aterminate_tasks_implicit( tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], termination_reason: TerminationReason, tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> bool:
589async def aterminate_tasks_implicit(
590        tasks: List[Task], 
591        termination_reason: TerminationReason,
592        tree: bool = False,
593        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
594        terminate_other_tasks: Optional[bool] = None, 
595        terminate_other_tasks_on_success: bool = False, 
596        terminate_other_tasks_on_failure: bool = False, 
597        terminate_other_tasks_on_timeout: bool = False, 
598        wait_for_termination: bool = False, 
599        ) -> bool:
600    i: Interface = current_interface()
601    return await aterminate_tasks_explicit(
602        i, 
603        tasks, 
604        termination_reason, 
605        tree, 
606        graceful_termination_settings, 
607        terminate_other_tasks, 
608        terminate_other_tasks_on_success, 
609        terminate_other_tasks_on_failure, 
610        terminate_other_tasks_on_timeout, 
611        wait_for_termination, 
612        )
async def aterminate_tasks( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], termination_reason: TerminationReason, tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> bool:
548async def aterminate_tasks_explicit(
549        i: Interface, 
550        tasks: List[Task], 
551        termination_reason: TerminationReason,
552        tree: bool = False,
553        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
554        terminate_other_tasks: Optional[bool] = None, 
555        terminate_other_tasks_on_success: bool = False, 
556        terminate_other_tasks_on_failure: bool = False, 
557        terminate_other_tasks_on_timeout: bool = False, 
558        wait_for_termination: bool = False, 
559        ) -> bool:
560    if not tasks:
561        return False
562    
563    if terminate_other_tasks is not None:
564        terminate_other_tasks_on_success = terminate_other_tasks
565        terminate_other_tasks_on_failure = terminate_other_tasks
566        terminate_other_tasks_on_timeout = terminate_other_tasks
567
568    if ((TerminationReason.success == termination_reason) and terminate_other_tasks_on_success) or \
569        ((TerminationReason.failure == termination_reason) and terminate_other_tasks_on_failure) or \
570            ((TerminationReason.timeout == termination_reason) and terminate_other_tasks_on_timeout):
571        if graceful_termination_settings is None:
572            termination_tasks_params: List[KillSingleCoroParams] = list([KillSingleCoroParams(task.coro_id, tree) for task in tasks if not task.done])
573            if not termination_tasks_params:
574                return False
575            
576            i(KillCoroList, termination_tasks)
577        else:
578            termination_tasks_params: List[PSCP] = [PSCP(atask_graceful_destroyer, task, tree, graceful_termination_settings) for task in tasks if not task.done]
579            if not termination_tasks_params:
580                return False
581            
582            termination_tasks: List[Task] = await i(PutCoroList, termination_tasks_params, True)
583            if wait_for_termination:
584                await await_tasks_explicit(i, termination_tasks, False, None, None, False, False, False, False)
585    
586    return True
async def aterminate_tasks_im( tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], termination_reason: TerminationReason, tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> bool:
589async def aterminate_tasks_implicit(
590        tasks: List[Task], 
591        termination_reason: TerminationReason,
592        tree: bool = False,
593        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
594        terminate_other_tasks: Optional[bool] = None, 
595        terminate_other_tasks_on_success: bool = False, 
596        terminate_other_tasks_on_failure: bool = False, 
597        terminate_other_tasks_on_timeout: bool = False, 
598        wait_for_termination: bool = False, 
599        ) -> bool:
600    i: Interface = current_interface()
601    return await aterminate_tasks_explicit(
602        i, 
603        tasks, 
604        termination_reason, 
605        tree, 
606        graceful_termination_settings, 
607        terminate_other_tasks, 
608        terminate_other_tasks_on_success, 
609        terminate_other_tasks_on_failure, 
610        terminate_other_tasks_on_timeout, 
611        wait_for_termination, 
612        )
def terminate_tasks_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], termination_reason: TerminationReason, tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> bool:
619def terminate_tasks_explicit(
620        i: Interface, 
621        tasks: List[Task], 
622        termination_reason: TerminationReason,
623        tree: bool = False,
624        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
625        terminate_other_tasks: Optional[bool] = None, 
626        terminate_other_tasks_on_success: bool = False, 
627        terminate_other_tasks_on_failure: bool = False, 
628        terminate_other_tasks_on_timeout: bool = False, 
629        wait_for_termination: bool = False, 
630        ) -> bool:
631    i(RunCoro, aterminate_tasks_explicit,
632        tasks, 
633        termination_reason, 
634        tree, 
635        graceful_termination_settings, 
636        terminate_other_tasks, 
637        terminate_other_tasks_on_success, 
638        terminate_other_tasks_on_failure, 
639        terminate_other_tasks_on_timeout, 
640        wait_for_termination, 
641        )
def terminate_tasks_implicit( tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], termination_reason: TerminationReason, tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> bool:
644def terminate_tasks_implicit(
645        tasks: List[Task], 
646        termination_reason: TerminationReason,
647        tree: bool = False,
648        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
649        terminate_other_tasks: Optional[bool] = None, 
650        terminate_other_tasks_on_success: bool = False, 
651        terminate_other_tasks_on_failure: bool = False, 
652        terminate_other_tasks_on_timeout: bool = False, 
653        wait_for_termination: bool = False, 
654        ) -> bool:
655    i: Interface = current_interface()
656    return terminate_tasks_explicit(
657        i, 
658        tasks, 
659        termination_reason, 
660        tree, 
661        graceful_termination_settings, 
662        terminate_other_tasks, 
663        terminate_other_tasks_on_success, 
664        terminate_other_tasks_on_failure, 
665        terminate_other_tasks_on_timeout, 
666        wait_for_termination, 
667        )
def terminate_tasks( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], termination_reason: TerminationReason, tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> bool:
619def terminate_tasks_explicit(
620        i: Interface, 
621        tasks: List[Task], 
622        termination_reason: TerminationReason,
623        tree: bool = False,
624        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
625        terminate_other_tasks: Optional[bool] = None, 
626        terminate_other_tasks_on_success: bool = False, 
627        terminate_other_tasks_on_failure: bool = False, 
628        terminate_other_tasks_on_timeout: bool = False, 
629        wait_for_termination: bool = False, 
630        ) -> bool:
631    i(RunCoro, aterminate_tasks_explicit,
632        tasks, 
633        termination_reason, 
634        tree, 
635        graceful_termination_settings, 
636        terminate_other_tasks, 
637        terminate_other_tasks_on_success, 
638        terminate_other_tasks_on_failure, 
639        terminate_other_tasks_on_timeout, 
640        wait_for_termination, 
641        )
def terminate_tasks_im( tasks: typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task], termination_reason: TerminationReason, tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> bool:
644def terminate_tasks_implicit(
645        tasks: List[Task], 
646        termination_reason: TerminationReason,
647        tree: bool = False,
648        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
649        terminate_other_tasks: Optional[bool] = None, 
650        terminate_other_tasks_on_success: bool = False, 
651        terminate_other_tasks_on_failure: bool = False, 
652        terminate_other_tasks_on_timeout: bool = False, 
653        wait_for_termination: bool = False, 
654        ) -> bool:
655    i: Interface = current_interface()
656    return terminate_tasks_explicit(
657        i, 
658        tasks, 
659        termination_reason, 
660        tree, 
661        graceful_termination_settings, 
662        terminate_other_tasks, 
663        terminate_other_tasks_on_success, 
664        terminate_other_tasks_on_failure, 
665        terminate_other_tasks_on_timeout, 
666        wait_for_termination, 
667        )
async def await_tasks_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
677async def await_tasks_explicit(
678        i: Interface, 
679        tasks: Union[Task, List[Task]], 
680        terminate_tree: bool = False,
681        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
682        timeout: Optional[float] = None, 
683        terminate_other_tasks: Optional[bool] = None, 
684        terminate_other_tasks_on_success: bool = False, 
685        terminate_other_tasks_on_failure: bool = False, 
686        terminate_other_tasks_on_timeout: bool = False, 
687        wait_for_termination: bool = False, 
688        ) -> List[Tuple[CoroID, Any, Exception]]:
689    if isinstance(tasks, Task):
690        tasks = [tasks]
691    
692    if not tasks:
693        raise TasksListIsEmptyError
694
695    normal_wait_id: str = str(f'wait_task__normal__{uuid4()}')
696    normal_info: NormalInfo = NormalInfo(normal_wait_id, tasks)
697
698    if timeout is not None:
699        # TODO: implement discard timer request when needed in order to improve performance
700        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, normal_info.on_timeout)
701
702    for another_task in tasks:
703        another_task.add_on_done_handler(NormalOnDoneHandler(normal_info))
704    
705    await i(AsyncEventBus, AsyncEventBusRequest().wait(normal_wait_id))
706    results: List[Tuple[CoroID, Any, Exception]] = normal_info.gather()
707    if normal_info.timeout:
708        termination_reason: TerminationReason = TerminationReason.timeout
709    elif normal_info.failure:
710        termination_reason = TerminationReason.failure
711    else:
712        termination_reason = TerminationReason.success
713
714    await terminate_tasks_explicit(
715        i, 
716        tasks, 
717        termination_reason, 
718        terminate_tree, 
719        graceful_termination_settings, 
720        terminate_other_tasks, 
721        terminate_other_tasks_on_success, 
722        terminate_other_tasks_on_failure, 
723        terminate_other_tasks_on_timeout, 
724        wait_for_termination, 
725        )
726    return results
async def await_tasks_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
729async def await_tasks_implicit(
730        tasks: Union[Task, List[Task]], 
731        terminate_tree: bool = False,
732        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
733        timeout: Optional[float] = None, 
734        terminate_other_tasks: Optional[bool] = None, 
735        terminate_other_tasks_on_success: bool = False, 
736        terminate_other_tasks_on_failure: bool = False, 
737        terminate_other_tasks_on_timeout: bool = False, 
738        wait_for_termination: bool = False, 
739        ) -> List[Tuple[CoroID, Any, Exception]]:
740    i: Interface = current_interface()
741    return await await_tasks_explicit(
742        i, 
743        tasks, 
744        terminate_tree, 
745        graceful_termination_settings, 
746        timeout, 
747        terminate_other_tasks, 
748        terminate_other_tasks_on_success, 
749        terminate_other_tasks_on_failure, 
750        terminate_other_tasks_on_timeout, 
751        wait_for_termination, 
752        )
async def await_tasks( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
677async def await_tasks_explicit(
678        i: Interface, 
679        tasks: Union[Task, List[Task]], 
680        terminate_tree: bool = False,
681        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
682        timeout: Optional[float] = None, 
683        terminate_other_tasks: Optional[bool] = None, 
684        terminate_other_tasks_on_success: bool = False, 
685        terminate_other_tasks_on_failure: bool = False, 
686        terminate_other_tasks_on_timeout: bool = False, 
687        wait_for_termination: bool = False, 
688        ) -> List[Tuple[CoroID, Any, Exception]]:
689    if isinstance(tasks, Task):
690        tasks = [tasks]
691    
692    if not tasks:
693        raise TasksListIsEmptyError
694
695    normal_wait_id: str = str(f'wait_task__normal__{uuid4()}')
696    normal_info: NormalInfo = NormalInfo(normal_wait_id, tasks)
697
698    if timeout is not None:
699        # TODO: implement discard timer request when needed in order to improve performance
700        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, normal_info.on_timeout)
701
702    for another_task in tasks:
703        another_task.add_on_done_handler(NormalOnDoneHandler(normal_info))
704    
705    await i(AsyncEventBus, AsyncEventBusRequest().wait(normal_wait_id))
706    results: List[Tuple[CoroID, Any, Exception]] = normal_info.gather()
707    if normal_info.timeout:
708        termination_reason: TerminationReason = TerminationReason.timeout
709    elif normal_info.failure:
710        termination_reason = TerminationReason.failure
711    else:
712        termination_reason = TerminationReason.success
713
714    await terminate_tasks_explicit(
715        i, 
716        tasks, 
717        termination_reason, 
718        terminate_tree, 
719        graceful_termination_settings, 
720        terminate_other_tasks, 
721        terminate_other_tasks_on_success, 
722        terminate_other_tasks_on_failure, 
723        terminate_other_tasks_on_timeout, 
724        wait_for_termination, 
725        )
726    return results
async def await_tasks_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
729async def await_tasks_implicit(
730        tasks: Union[Task, List[Task]], 
731        terminate_tree: bool = False,
732        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
733        timeout: Optional[float] = None, 
734        terminate_other_tasks: Optional[bool] = None, 
735        terminate_other_tasks_on_success: bool = False, 
736        terminate_other_tasks_on_failure: bool = False, 
737        terminate_other_tasks_on_timeout: bool = False, 
738        wait_for_termination: bool = False, 
739        ) -> List[Tuple[CoroID, Any, Exception]]:
740    i: Interface = current_interface()
741    return await await_tasks_explicit(
742        i, 
743        tasks, 
744        terminate_tree, 
745        graceful_termination_settings, 
746        timeout, 
747        terminate_other_tasks, 
748        terminate_other_tasks_on_success, 
749        terminate_other_tasks_on_failure, 
750        terminate_other_tasks_on_timeout, 
751        wait_for_termination, 
752        )
def wait_tasks_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
759def wait_tasks_explicit(
760        i: Interface, 
761        tasks: Union[Task, List[Task]], 
762        terminate_tree: bool = False,
763        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
764        timeout: Optional[float] = None, 
765        terminate_other_tasks: Optional[bool] = None, 
766        terminate_other_tasks_on_success: bool = False, 
767        terminate_other_tasks_on_failure: bool = False, 
768        terminate_other_tasks_on_timeout: bool = False, 
769        wait_for_termination: bool = False, 
770        ) -> List[Tuple[CoroID, Any, Exception]]:
771    i(RunCoro, await_tasks_explicit,
772        tasks, 
773        terminate_tree, 
774        graceful_termination_settings, 
775        timeout, 
776        terminate_other_tasks, 
777        terminate_other_tasks_on_success, 
778        terminate_other_tasks_on_failure, 
779        terminate_other_tasks_on_timeout, 
780        wait_for_termination, 
781        )
def wait_tasks_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
784def wait_tasks_implicit(
785        tasks: Union[Task, List[Task]], 
786        terminate_tree: bool = False,
787        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
788        timeout: Optional[float] = None, 
789        terminate_other_tasks: Optional[bool] = None, 
790        terminate_other_tasks_on_success: bool = False, 
791        terminate_other_tasks_on_failure: bool = False, 
792        terminate_other_tasks_on_timeout: bool = False, 
793        wait_for_termination: bool = False, 
794        ) -> List[Tuple[CoroID, Any, Exception]]:
795    i: Interface = current_interface()
796    return wait_tasks_explicit(
797        i, 
798        tasks, 
799        terminate_tree, 
800        graceful_termination_settings, 
801        timeout, 
802        terminate_other_tasks, 
803        terminate_other_tasks_on_success, 
804        terminate_other_tasks_on_failure, 
805        terminate_other_tasks_on_timeout, 
806        wait_for_termination, 
807        )
def wait_tasks( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
759def wait_tasks_explicit(
760        i: Interface, 
761        tasks: Union[Task, List[Task]], 
762        terminate_tree: bool = False,
763        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
764        timeout: Optional[float] = None, 
765        terminate_other_tasks: Optional[bool] = None, 
766        terminate_other_tasks_on_success: bool = False, 
767        terminate_other_tasks_on_failure: bool = False, 
768        terminate_other_tasks_on_timeout: bool = False, 
769        wait_for_termination: bool = False, 
770        ) -> List[Tuple[CoroID, Any, Exception]]:
771    i(RunCoro, await_tasks_explicit,
772        tasks, 
773        terminate_tree, 
774        graceful_termination_settings, 
775        timeout, 
776        terminate_other_tasks, 
777        terminate_other_tasks_on_success, 
778        terminate_other_tasks_on_failure, 
779        terminate_other_tasks_on_timeout, 
780        wait_for_termination, 
781        )
def wait_tasks_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
784def wait_tasks_implicit(
785        tasks: Union[Task, List[Task]], 
786        terminate_tree: bool = False,
787        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
788        timeout: Optional[float] = None, 
789        terminate_other_tasks: Optional[bool] = None, 
790        terminate_other_tasks_on_success: bool = False, 
791        terminate_other_tasks_on_failure: bool = False, 
792        terminate_other_tasks_on_timeout: bool = False, 
793        wait_for_termination: bool = False, 
794        ) -> List[Tuple[CoroID, Any, Exception]]:
795    i: Interface = current_interface()
796    return wait_tasks_explicit(
797        i, 
798        tasks, 
799        terminate_tree, 
800        graceful_termination_settings, 
801        timeout, 
802        terminate_other_tasks, 
803        terminate_other_tasks_on_success, 
804        terminate_other_tasks_on_failure, 
805        terminate_other_tasks_on_timeout, 
806        wait_for_termination, 
807        )
async def await_tasks_fastest_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> Tuple[int, Any]:
817async def await_tasks_fastest_explicit(
818        i: Interface, 
819        tasks: Union[Task, List[Task]], 
820        terminate_tree: bool = False,
821        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
822        timeout: Optional[float] = None, 
823        terminate_other_tasks: Optional[bool] = None, 
824        terminate_other_tasks_on_success: bool = False, 
825        terminate_other_tasks_on_failure: bool = False, 
826        terminate_other_tasks_on_timeout: bool = False, 
827        wait_for_termination: bool = False, 
828        ) -> Tuple[CoroID, Any]:
829    if isinstance(tasks, Task):
830        tasks = [tasks]
831    
832    if not tasks:
833        raise TasksListIsEmptyError
834
835    fastest_wait_id: str = str(f'wait_task__fastest__{uuid4()}')
836    fastest_info: FastestInfo = FastestInfo(fastest_wait_id, tasks)
837
838    if timeout is not None:
839        # TODO: implement discard timer request when needed in order to improve performance
840        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_info.on_timeout)
841
842    for another_task in tasks:
843        another_task.add_on_done_handler(FastestOnDoneHandler(fastest_info))
844    
845    await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_wait_id))
846    results: List[Tuple[CoroID, Any, Exception]] = fastest_info.gather()
847    if fastest_info.timeout:
848        termination_reason: TerminationReason = TerminationReason.timeout
849    elif fastest_info.failure:
850        termination_reason = TerminationReason.failure
851    else:
852        termination_reason = TerminationReason.success
853
854    await aterminate_tasks_explicit(
855        i, 
856        tasks, 
857        termination_reason, 
858        terminate_tree, 
859        graceful_termination_settings, 
860        terminate_other_tasks, 
861        terminate_other_tasks_on_success, 
862        terminate_other_tasks_on_failure, 
863        terminate_other_tasks_on_timeout, 
864        wait_for_termination, 
865        )
866    if not results:
867        return None, None
868    
869    coro_id, result, exception = results[0]
870    if exception is not None:
871        if isinstance(exception, TimeoutError) and (coro_id is not None):
872            exception = SubTimeoutError()
873        
874        raise exception
875    
876    return coro_id, result
async def await_tasks_fastest_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
879async def await_tasks_fastest_implicit(
880        tasks: Union[Task, List[Task]], 
881        terminate_tree: bool = False,
882        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
883        timeout: Optional[float] = None, 
884        terminate_other_tasks: Optional[bool] = None, 
885        terminate_other_tasks_on_success: bool = False, 
886        terminate_other_tasks_on_failure: bool = False, 
887        terminate_other_tasks_on_timeout: bool = False, 
888        wait_for_termination: bool = False, 
889        ) -> List[Tuple[CoroID, Any, Exception]]:
890    i: Interface = current_interface()
891    return await await_tasks_fastest_explicit(
892        i, 
893        tasks, 
894        terminate_tree, 
895        graceful_termination_settings, 
896        timeout, 
897        terminate_other_tasks, 
898        terminate_other_tasks_on_success, 
899        terminate_other_tasks_on_failure, 
900        terminate_other_tasks_on_timeout, 
901        wait_for_termination, 
902        )
async def await_tasks_fastest( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> Tuple[int, Any]:
817async def await_tasks_fastest_explicit(
818        i: Interface, 
819        tasks: Union[Task, List[Task]], 
820        terminate_tree: bool = False,
821        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
822        timeout: Optional[float] = None, 
823        terminate_other_tasks: Optional[bool] = None, 
824        terminate_other_tasks_on_success: bool = False, 
825        terminate_other_tasks_on_failure: bool = False, 
826        terminate_other_tasks_on_timeout: bool = False, 
827        wait_for_termination: bool = False, 
828        ) -> Tuple[CoroID, Any]:
829    if isinstance(tasks, Task):
830        tasks = [tasks]
831    
832    if not tasks:
833        raise TasksListIsEmptyError
834
835    fastest_wait_id: str = str(f'wait_task__fastest__{uuid4()}')
836    fastest_info: FastestInfo = FastestInfo(fastest_wait_id, tasks)
837
838    if timeout is not None:
839        # TODO: implement discard timer request when needed in order to improve performance
840        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_info.on_timeout)
841
842    for another_task in tasks:
843        another_task.add_on_done_handler(FastestOnDoneHandler(fastest_info))
844    
845    await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_wait_id))
846    results: List[Tuple[CoroID, Any, Exception]] = fastest_info.gather()
847    if fastest_info.timeout:
848        termination_reason: TerminationReason = TerminationReason.timeout
849    elif fastest_info.failure:
850        termination_reason = TerminationReason.failure
851    else:
852        termination_reason = TerminationReason.success
853
854    await aterminate_tasks_explicit(
855        i, 
856        tasks, 
857        termination_reason, 
858        terminate_tree, 
859        graceful_termination_settings, 
860        terminate_other_tasks, 
861        terminate_other_tasks_on_success, 
862        terminate_other_tasks_on_failure, 
863        terminate_other_tasks_on_timeout, 
864        wait_for_termination, 
865        )
866    if not results:
867        return None, None
868    
869    coro_id, result, exception = results[0]
870    if exception is not None:
871        if isinstance(exception, TimeoutError) and (coro_id is not None):
872            exception = SubTimeoutError()
873        
874        raise exception
875    
876    return coro_id, result
async def await_tasks_fastest_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
879async def await_tasks_fastest_implicit(
880        tasks: Union[Task, List[Task]], 
881        terminate_tree: bool = False,
882        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
883        timeout: Optional[float] = None, 
884        terminate_other_tasks: Optional[bool] = None, 
885        terminate_other_tasks_on_success: bool = False, 
886        terminate_other_tasks_on_failure: bool = False, 
887        terminate_other_tasks_on_timeout: bool = False, 
888        wait_for_termination: bool = False, 
889        ) -> List[Tuple[CoroID, Any, Exception]]:
890    i: Interface = current_interface()
891    return await await_tasks_fastest_explicit(
892        i, 
893        tasks, 
894        terminate_tree, 
895        graceful_termination_settings, 
896        timeout, 
897        terminate_other_tasks, 
898        terminate_other_tasks_on_success, 
899        terminate_other_tasks_on_failure, 
900        terminate_other_tasks_on_timeout, 
901        wait_for_termination, 
902        )
def wait_tasks_fastest_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
909def wait_tasks_fastest_explicit(
910        i: Interface, 
911        tasks: Union[Task, List[Task]], 
912        terminate_tree: bool = False,
913        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
914        timeout: Optional[float] = None, 
915        terminate_other_tasks: Optional[bool] = None, 
916        terminate_other_tasks_on_success: bool = False, 
917        terminate_other_tasks_on_failure: bool = False, 
918        terminate_other_tasks_on_timeout: bool = False, 
919        wait_for_termination: bool = False, 
920        ) -> List[Tuple[CoroID, Any, Exception]]:
921    i(RunCoro, await_tasks_fastest_explicit,
922        tasks, 
923        terminate_tree, 
924        graceful_termination_settings, 
925        timeout, 
926        terminate_other_tasks, 
927        terminate_other_tasks_on_success, 
928        terminate_other_tasks_on_failure, 
929        terminate_other_tasks_on_timeout, 
930        wait_for_termination, 
931        )
def wait_tasks_fastest_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
934def wait_tasks_fastest_implicit(
935        tasks: Union[Task, List[Task]], 
936        terminate_tree: bool = False,
937        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
938        timeout: Optional[float] = None, 
939        terminate_other_tasks: Optional[bool] = None, 
940        terminate_other_tasks_on_success: bool = False, 
941        terminate_other_tasks_on_failure: bool = False, 
942        terminate_other_tasks_on_timeout: bool = False, 
943        wait_for_termination: bool = False, 
944        ) -> List[Tuple[CoroID, Any, Exception]]:
945    i: Interface = current_interface()
946    return wait_tasks_fastest_explicit(
947        i, 
948        tasks, 
949        terminate_tree, 
950        graceful_termination_settings, 
951        timeout, 
952        terminate_other_tasks, 
953        terminate_other_tasks_on_success, 
954        terminate_other_tasks_on_failure, 
955        terminate_other_tasks_on_timeout, 
956        wait_for_termination, 
957        )
def wait_tasks_fastest( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
909def wait_tasks_fastest_explicit(
910        i: Interface, 
911        tasks: Union[Task, List[Task]], 
912        terminate_tree: bool = False,
913        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
914        timeout: Optional[float] = None, 
915        terminate_other_tasks: Optional[bool] = None, 
916        terminate_other_tasks_on_success: bool = False, 
917        terminate_other_tasks_on_failure: bool = False, 
918        terminate_other_tasks_on_timeout: bool = False, 
919        wait_for_termination: bool = False, 
920        ) -> List[Tuple[CoroID, Any, Exception]]:
921    i(RunCoro, await_tasks_fastest_explicit,
922        tasks, 
923        terminate_tree, 
924        graceful_termination_settings, 
925        timeout, 
926        terminate_other_tasks, 
927        terminate_other_tasks_on_success, 
928        terminate_other_tasks_on_failure, 
929        terminate_other_tasks_on_timeout, 
930        wait_for_termination, 
931        )
def wait_tasks_fastest_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
934def wait_tasks_fastest_implicit(
935        tasks: Union[Task, List[Task]], 
936        terminate_tree: bool = False,
937        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
938        timeout: Optional[float] = None, 
939        terminate_other_tasks: Optional[bool] = None, 
940        terminate_other_tasks_on_success: bool = False, 
941        terminate_other_tasks_on_failure: bool = False, 
942        terminate_other_tasks_on_timeout: bool = False, 
943        wait_for_termination: bool = False, 
944        ) -> List[Tuple[CoroID, Any, Exception]]:
945    i: Interface = current_interface()
946    return wait_tasks_fastest_explicit(
947        i, 
948        tasks, 
949        terminate_tree, 
950        graceful_termination_settings, 
951        timeout, 
952        terminate_other_tasks, 
953        terminate_other_tasks_on_success, 
954        terminate_other_tasks_on_failure, 
955        terminate_other_tasks_on_timeout, 
956        wait_for_termination, 
957        )
async def await_tasks_fastest_successful_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> Tuple[int, Any]:
 967async def await_tasks_fastest_successful_explicit(
 968        i: Interface, 
 969        tasks: Union[Task, List[Task]], 
 970        terminate_tree: bool = False,
 971        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
 972        timeout: Optional[float] = None, 
 973        terminate_other_tasks: Optional[bool] = None, 
 974        terminate_other_tasks_on_success: bool = False, 
 975        terminate_other_tasks_on_failure: bool = False, 
 976        terminate_other_tasks_on_timeout: bool = False, 
 977        wait_for_termination: bool = False, 
 978        ) -> Tuple[CoroID, Any]:
 979    if isinstance(tasks, Task):
 980        tasks = [tasks]
 981    
 982    if not tasks:
 983        raise TasksListIsEmptyError
 984
 985    fastest_successful_wait_id: str = str(f'wait_task__fastest_successful__{uuid4()}')
 986    fastest_successful_info: FastestSuccessfulInfo = FastestSuccessfulInfo(fastest_successful_wait_id, tasks)
 987
 988    if timeout is not None:
 989        # TODO: implement discard timer request when needed in order to improve performance
 990        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_successful_info.on_timeout)
 991
 992    for another_task in tasks:
 993        another_task.add_on_done_handler(FastestSuccessfulOnDoneHandler(fastest_successful_info))
 994    
 995    await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_successful_wait_id))
 996    results: List[Tuple[CoroID, Any, Exception]] = fastest_successful_info.gather()
 997    if fastest_successful_info.timeout:
 998        termination_reason: TerminationReason = TerminationReason.timeout
 999    elif fastest_successful_info.failure:
1000        termination_reason = TerminationReason.failure
1001    else:
1002        termination_reason = TerminationReason.success
1003
1004    await aterminate_tasks_explicit(
1005        i, 
1006        tasks, 
1007        termination_reason, 
1008        terminate_tree, 
1009        graceful_termination_settings, 
1010        terminate_other_tasks, 
1011        terminate_other_tasks_on_success, 
1012        terminate_other_tasks_on_failure, 
1013        terminate_other_tasks_on_timeout, 
1014        wait_for_termination, 
1015        )
1016    if not results:
1017        return None, None
1018    
1019    coro_id, result, exception = results[0]
1020    if exception is not None:
1021        if isinstance(exception, TimeoutError) and (coro_id is not None):
1022            exception = SubTimeoutError()
1023        
1024        raise exception
1025    
1026    return coro_id, result
async def await_tasks_fastest_successful_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1029async def await_tasks_fastest_successful_implicit(
1030        tasks: Union[Task, List[Task]], 
1031        terminate_tree: bool = False,
1032        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1033        timeout: Optional[float] = None, 
1034        terminate_other_tasks: Optional[bool] = None, 
1035        terminate_other_tasks_on_success: bool = False, 
1036        terminate_other_tasks_on_failure: bool = False, 
1037        terminate_other_tasks_on_timeout: bool = False, 
1038        wait_for_termination: bool = False, 
1039        ) -> List[Tuple[CoroID, Any, Exception]]:
1040    i: Interface = current_interface()
1041    return await await_tasks_fastest_successful_explicit(
1042        i, 
1043        tasks, 
1044        terminate_tree, 
1045        graceful_termination_settings, 
1046        timeout, 
1047        terminate_other_tasks, 
1048        terminate_other_tasks_on_success, 
1049        terminate_other_tasks_on_failure, 
1050        terminate_other_tasks_on_timeout, 
1051        wait_for_termination, 
1052        )
async def await_tasks_fastest_successful( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> Tuple[int, Any]:
 967async def await_tasks_fastest_successful_explicit(
 968        i: Interface, 
 969        tasks: Union[Task, List[Task]], 
 970        terminate_tree: bool = False,
 971        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
 972        timeout: Optional[float] = None, 
 973        terminate_other_tasks: Optional[bool] = None, 
 974        terminate_other_tasks_on_success: bool = False, 
 975        terminate_other_tasks_on_failure: bool = False, 
 976        terminate_other_tasks_on_timeout: bool = False, 
 977        wait_for_termination: bool = False, 
 978        ) -> Tuple[CoroID, Any]:
 979    if isinstance(tasks, Task):
 980        tasks = [tasks]
 981    
 982    if not tasks:
 983        raise TasksListIsEmptyError
 984
 985    fastest_successful_wait_id: str = str(f'wait_task__fastest_successful__{uuid4()}')
 986    fastest_successful_info: FastestSuccessfulInfo = FastestSuccessfulInfo(fastest_successful_wait_id, tasks)
 987
 988    if timeout is not None:
 989        # TODO: implement discard timer request when needed in order to improve performance
 990        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_successful_info.on_timeout)
 991
 992    for another_task in tasks:
 993        another_task.add_on_done_handler(FastestSuccessfulOnDoneHandler(fastest_successful_info))
 994    
 995    await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_successful_wait_id))
 996    results: List[Tuple[CoroID, Any, Exception]] = fastest_successful_info.gather()
 997    if fastest_successful_info.timeout:
 998        termination_reason: TerminationReason = TerminationReason.timeout
 999    elif fastest_successful_info.failure:
1000        termination_reason = TerminationReason.failure
1001    else:
1002        termination_reason = TerminationReason.success
1003
1004    await aterminate_tasks_explicit(
1005        i, 
1006        tasks, 
1007        termination_reason, 
1008        terminate_tree, 
1009        graceful_termination_settings, 
1010        terminate_other_tasks, 
1011        terminate_other_tasks_on_success, 
1012        terminate_other_tasks_on_failure, 
1013        terminate_other_tasks_on_timeout, 
1014        wait_for_termination, 
1015        )
1016    if not results:
1017        return None, None
1018    
1019    coro_id, result, exception = results[0]
1020    if exception is not None:
1021        if isinstance(exception, TimeoutError) and (coro_id is not None):
1022            exception = SubTimeoutError()
1023        
1024        raise exception
1025    
1026    return coro_id, result
async def await_tasks_fastest_successful_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1029async def await_tasks_fastest_successful_implicit(
1030        tasks: Union[Task, List[Task]], 
1031        terminate_tree: bool = False,
1032        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1033        timeout: Optional[float] = None, 
1034        terminate_other_tasks: Optional[bool] = None, 
1035        terminate_other_tasks_on_success: bool = False, 
1036        terminate_other_tasks_on_failure: bool = False, 
1037        terminate_other_tasks_on_timeout: bool = False, 
1038        wait_for_termination: bool = False, 
1039        ) -> List[Tuple[CoroID, Any, Exception]]:
1040    i: Interface = current_interface()
1041    return await await_tasks_fastest_successful_explicit(
1042        i, 
1043        tasks, 
1044        terminate_tree, 
1045        graceful_termination_settings, 
1046        timeout, 
1047        terminate_other_tasks, 
1048        terminate_other_tasks_on_success, 
1049        terminate_other_tasks_on_failure, 
1050        terminate_other_tasks_on_timeout, 
1051        wait_for_termination, 
1052        )
def wait_tasks_fastest_successful_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1059def wait_tasks_fastest_successful_explicit(
1060        i: Interface, 
1061        tasks: Union[Task, List[Task]], 
1062        terminate_tree: bool = False,
1063        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1064        timeout: Optional[float] = None, 
1065        terminate_other_tasks: Optional[bool] = None, 
1066        terminate_other_tasks_on_success: bool = False, 
1067        terminate_other_tasks_on_failure: bool = False, 
1068        terminate_other_tasks_on_timeout: bool = False, 
1069        wait_for_termination: bool = False, 
1070        ) -> List[Tuple[CoroID, Any, Exception]]:
1071    i(RunCoro, await_tasks_explicit,
1072        tasks, 
1073        terminate_tree, 
1074        graceful_termination_settings, 
1075        timeout, 
1076        terminate_other_tasks, 
1077        terminate_other_tasks_on_success, 
1078        terminate_other_tasks_on_failure, 
1079        terminate_other_tasks_on_timeout, 
1080        wait_for_termination, 
1081        )
def wait_tasks_fastest_successful_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1084def wait_tasks_fastest_successful_implicit(
1085        tasks: Union[Task, List[Task]], 
1086        terminate_tree: bool = False,
1087        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1088        timeout: Optional[float] = None, 
1089        terminate_other_tasks: Optional[bool] = None, 
1090        terminate_other_tasks_on_success: bool = False, 
1091        terminate_other_tasks_on_failure: bool = False, 
1092        terminate_other_tasks_on_timeout: bool = False, 
1093        wait_for_termination: bool = False, 
1094        ) -> List[Tuple[CoroID, Any, Exception]]:
1095    i: Interface = current_interface()
1096    return wait_tasks_fastest_successful_explicit(
1097        i, 
1098        tasks, 
1099        terminate_tree, 
1100        graceful_termination_settings, 
1101        timeout, 
1102        terminate_other_tasks, 
1103        terminate_other_tasks_on_success, 
1104        terminate_other_tasks_on_failure, 
1105        terminate_other_tasks_on_timeout, 
1106        wait_for_termination, 
1107        )
def wait_tasks_fastest_successful( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1059def wait_tasks_fastest_successful_explicit(
1060        i: Interface, 
1061        tasks: Union[Task, List[Task]], 
1062        terminate_tree: bool = False,
1063        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1064        timeout: Optional[float] = None, 
1065        terminate_other_tasks: Optional[bool] = None, 
1066        terminate_other_tasks_on_success: bool = False, 
1067        terminate_other_tasks_on_failure: bool = False, 
1068        terminate_other_tasks_on_timeout: bool = False, 
1069        wait_for_termination: bool = False, 
1070        ) -> List[Tuple[CoroID, Any, Exception]]:
1071    i(RunCoro, await_tasks_explicit,
1072        tasks, 
1073        terminate_tree, 
1074        graceful_termination_settings, 
1075        timeout, 
1076        terminate_other_tasks, 
1077        terminate_other_tasks_on_success, 
1078        terminate_other_tasks_on_failure, 
1079        terminate_other_tasks_on_timeout, 
1080        wait_for_termination, 
1081        )
def wait_tasks_fastest_successful_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1084def wait_tasks_fastest_successful_implicit(
1085        tasks: Union[Task, List[Task]], 
1086        terminate_tree: bool = False,
1087        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1088        timeout: Optional[float] = None, 
1089        terminate_other_tasks: Optional[bool] = None, 
1090        terminate_other_tasks_on_success: bool = False, 
1091        terminate_other_tasks_on_failure: bool = False, 
1092        terminate_other_tasks_on_timeout: bool = False, 
1093        wait_for_termination: bool = False, 
1094        ) -> List[Tuple[CoroID, Any, Exception]]:
1095    i: Interface = current_interface()
1096    return wait_tasks_fastest_successful_explicit(
1097        i, 
1098        tasks, 
1099        terminate_tree, 
1100        graceful_termination_settings, 
1101        timeout, 
1102        terminate_other_tasks, 
1103        terminate_other_tasks_on_success, 
1104        terminate_other_tasks_on_failure, 
1105        terminate_other_tasks_on_timeout, 
1106        wait_for_termination, 
1107        )
async def await_tasks_fastest_exception_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> Tuple[int, Exception]:
1117async def await_tasks_fastest_exception_explicit(
1118        i: Interface, 
1119        tasks: Union[Task, List[Task]], 
1120        terminate_tree: bool = False,
1121        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1122        timeout: Optional[float] = None, 
1123        terminate_other_tasks: Optional[bool] = None, 
1124        terminate_other_tasks_on_success: bool = False, 
1125        terminate_other_tasks_on_failure: bool = False, 
1126        terminate_other_tasks_on_timeout: bool = False, 
1127        wait_for_termination: bool = False, 
1128        ) -> Tuple[CoroID, Exception]:
1129    if isinstance(tasks, Task):
1130        tasks = [tasks]
1131    
1132    if not tasks:
1133        raise TasksListIsEmptyError
1134
1135    fastest_exception_wait_id: str = str(f'wait_task__fastest_exception__{uuid4()}')
1136    fastest_exception_info: FastestExceptionInfo = FastestExceptionInfo(fastest_exception_wait_id, tasks)
1137
1138    if timeout is not None:
1139        # TODO: implement discard timer request when needed in order to improve performance
1140        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_exception_info.on_timeout)
1141
1142    for another_task in tasks:
1143        another_task.add_on_done_handler(FastestExceptionOnDoneHandler(fastest_exception_info))
1144    
1145    await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_exception_wait_id))
1146    results: List[Tuple[CoroID, Any, Exception]] = fastest_exception_info.gather()
1147    if fastest_exception_info.timeout:
1148        termination_reason: TerminationReason = TerminationReason.timeout
1149    elif fastest_exception_info.failure:
1150        termination_reason = TerminationReason.failure
1151    else:
1152        termination_reason = TerminationReason.success
1153
1154    await aterminate_tasks_explicit(
1155        i, 
1156        tasks, 
1157        termination_reason, 
1158        terminate_tree, 
1159        graceful_termination_settings, 
1160        terminate_other_tasks, 
1161        terminate_other_tasks_on_success, 
1162        terminate_other_tasks_on_failure, 
1163        terminate_other_tasks_on_timeout, 
1164        wait_for_termination, 
1165        )
1166    if not results:
1167        return None, None
1168    
1169    coro_id, result, exception = results[0]
1170    if exception is not None:
1171        if isinstance(exception, TimeoutError) and (coro_id is not None):
1172            exception = SubTimeoutError()
1173    
1174    return coro_id, exception
async def await_tasks_fastest_exception_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1177async def await_tasks_fastest_exception_implicit(
1178        tasks: Union[Task, List[Task]], 
1179        terminate_tree: bool = False,
1180        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1181        timeout: Optional[float] = None, 
1182        terminate_other_tasks: Optional[bool] = None, 
1183        terminate_other_tasks_on_success: bool = False, 
1184        terminate_other_tasks_on_failure: bool = False, 
1185        terminate_other_tasks_on_timeout: bool = False, 
1186        wait_for_termination: bool = False, 
1187        ) -> List[Tuple[CoroID, Any, Exception]]:
1188    i: Interface = current_interface()
1189    return await await_tasks_explicit(
1190        i, 
1191        tasks, 
1192        terminate_tree, 
1193        graceful_termination_settings, 
1194        timeout, 
1195        terminate_other_tasks, 
1196        terminate_other_tasks_on_success, 
1197        terminate_other_tasks_on_failure, 
1198        terminate_other_tasks_on_timeout, 
1199        wait_for_termination, 
1200        )
async def await_tasks_fastest_exception( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> Tuple[int, Exception]:
1117async def await_tasks_fastest_exception_explicit(
1118        i: Interface, 
1119        tasks: Union[Task, List[Task]], 
1120        terminate_tree: bool = False,
1121        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1122        timeout: Optional[float] = None, 
1123        terminate_other_tasks: Optional[bool] = None, 
1124        terminate_other_tasks_on_success: bool = False, 
1125        terminate_other_tasks_on_failure: bool = False, 
1126        terminate_other_tasks_on_timeout: bool = False, 
1127        wait_for_termination: bool = False, 
1128        ) -> Tuple[CoroID, Exception]:
1129    if isinstance(tasks, Task):
1130        tasks = [tasks]
1131    
1132    if not tasks:
1133        raise TasksListIsEmptyError
1134
1135    fastest_exception_wait_id: str = str(f'wait_task__fastest_exception__{uuid4()}')
1136    fastest_exception_info: FastestExceptionInfo = FastestExceptionInfo(fastest_exception_wait_id, tasks)
1137
1138    if timeout is not None:
1139        # TODO: implement discard timer request when needed in order to improve performance
1140        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_exception_info.on_timeout)
1141
1142    for another_task in tasks:
1143        another_task.add_on_done_handler(FastestExceptionOnDoneHandler(fastest_exception_info))
1144    
1145    await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_exception_wait_id))
1146    results: List[Tuple[CoroID, Any, Exception]] = fastest_exception_info.gather()
1147    if fastest_exception_info.timeout:
1148        termination_reason: TerminationReason = TerminationReason.timeout
1149    elif fastest_exception_info.failure:
1150        termination_reason = TerminationReason.failure
1151    else:
1152        termination_reason = TerminationReason.success
1153
1154    await aterminate_tasks_explicit(
1155        i, 
1156        tasks, 
1157        termination_reason, 
1158        terminate_tree, 
1159        graceful_termination_settings, 
1160        terminate_other_tasks, 
1161        terminate_other_tasks_on_success, 
1162        terminate_other_tasks_on_failure, 
1163        terminate_other_tasks_on_timeout, 
1164        wait_for_termination, 
1165        )
1166    if not results:
1167        return None, None
1168    
1169    coro_id, result, exception = results[0]
1170    if exception is not None:
1171        if isinstance(exception, TimeoutError) and (coro_id is not None):
1172            exception = SubTimeoutError()
1173    
1174    return coro_id, exception
async def await_tasks_fastest_exception_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1177async def await_tasks_fastest_exception_implicit(
1178        tasks: Union[Task, List[Task]], 
1179        terminate_tree: bool = False,
1180        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1181        timeout: Optional[float] = None, 
1182        terminate_other_tasks: Optional[bool] = None, 
1183        terminate_other_tasks_on_success: bool = False, 
1184        terminate_other_tasks_on_failure: bool = False, 
1185        terminate_other_tasks_on_timeout: bool = False, 
1186        wait_for_termination: bool = False, 
1187        ) -> List[Tuple[CoroID, Any, Exception]]:
1188    i: Interface = current_interface()
1189    return await await_tasks_explicit(
1190        i, 
1191        tasks, 
1192        terminate_tree, 
1193        graceful_termination_settings, 
1194        timeout, 
1195        terminate_other_tasks, 
1196        terminate_other_tasks_on_success, 
1197        terminate_other_tasks_on_failure, 
1198        terminate_other_tasks_on_timeout, 
1199        wait_for_termination, 
1200        )
def wait_tasks_fastest_exception_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1207def wait_tasks_fastest_exception_explicit(
1208        i: Interface, 
1209        tasks: Union[Task, List[Task]], 
1210        terminate_tree: bool = False,
1211        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1212        timeout: Optional[float] = None, 
1213        terminate_other_tasks: Optional[bool] = None, 
1214        terminate_other_tasks_on_success: bool = False, 
1215        terminate_other_tasks_on_failure: bool = False, 
1216        terminate_other_tasks_on_timeout: bool = False, 
1217        wait_for_termination: bool = False, 
1218        ) -> List[Tuple[CoroID, Any, Exception]]:
1219    i(RunCoro, await_tasks_fastest_exception_explicit,
1220        tasks, 
1221        terminate_tree, 
1222        graceful_termination_settings, 
1223        timeout, 
1224        terminate_other_tasks, 
1225        terminate_other_tasks_on_success, 
1226        terminate_other_tasks_on_failure, 
1227        terminate_other_tasks_on_timeout, 
1228        wait_for_termination, 
1229        )
def wait_tasks_fastest_exception_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1232def wait_tasks_fastest_exception_implicit(
1233        tasks: Union[Task, List[Task]], 
1234        terminate_tree: bool = False,
1235        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1236        timeout: Optional[float] = None, 
1237        terminate_other_tasks: Optional[bool] = None, 
1238        terminate_other_tasks_on_success: bool = False, 
1239        terminate_other_tasks_on_failure: bool = False, 
1240        terminate_other_tasks_on_timeout: bool = False, 
1241        wait_for_termination: bool = False, 
1242        ) -> List[Tuple[CoroID, Any, Exception]]:
1243    i: Interface = current_interface()
1244    return wait_tasks_fastest_exception_explicit(
1245        i, 
1246        tasks, 
1247        terminate_tree, 
1248        graceful_termination_settings, 
1249        timeout, 
1250        terminate_other_tasks, 
1251        terminate_other_tasks_on_success, 
1252        terminate_other_tasks_on_failure, 
1253        terminate_other_tasks_on_timeout, 
1254        wait_for_termination, 
1255        )
def wait_tasks_fastest_exception( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1207def wait_tasks_fastest_exception_explicit(
1208        i: Interface, 
1209        tasks: Union[Task, List[Task]], 
1210        terminate_tree: bool = False,
1211        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1212        timeout: Optional[float] = None, 
1213        terminate_other_tasks: Optional[bool] = None, 
1214        terminate_other_tasks_on_success: bool = False, 
1215        terminate_other_tasks_on_failure: bool = False, 
1216        terminate_other_tasks_on_timeout: bool = False, 
1217        wait_for_termination: bool = False, 
1218        ) -> List[Tuple[CoroID, Any, Exception]]:
1219    i(RunCoro, await_tasks_fastest_exception_explicit,
1220        tasks, 
1221        terminate_tree, 
1222        graceful_termination_settings, 
1223        timeout, 
1224        terminate_other_tasks, 
1225        terminate_other_tasks_on_success, 
1226        terminate_other_tasks_on_failure, 
1227        terminate_other_tasks_on_timeout, 
1228        wait_for_termination, 
1229        )
def wait_tasks_fastest_exception_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1232def wait_tasks_fastest_exception_implicit(
1233        tasks: Union[Task, List[Task]], 
1234        terminate_tree: bool = False,
1235        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1236        timeout: Optional[float] = None, 
1237        terminate_other_tasks: Optional[bool] = None, 
1238        terminate_other_tasks_on_success: bool = False, 
1239        terminate_other_tasks_on_failure: bool = False, 
1240        terminate_other_tasks_on_timeout: bool = False, 
1241        wait_for_termination: bool = False, 
1242        ) -> List[Tuple[CoroID, Any, Exception]]:
1243    i: Interface = current_interface()
1244    return wait_tasks_fastest_exception_explicit(
1245        i, 
1246        tasks, 
1247        terminate_tree, 
1248        graceful_termination_settings, 
1249        timeout, 
1250        terminate_other_tasks, 
1251        terminate_other_tasks_on_success, 
1252        terminate_other_tasks_on_failure, 
1253        terminate_other_tasks_on_timeout, 
1254        wait_for_termination, 
1255        )
async def await_tasks_fastest_custom_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> Tuple[int, Any]:
1265async def await_tasks_fastest_custom_explicit(
1266        i: Interface, 
1267        tasks: Union[Task, List[Task]], 
1268        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1269        terminate_tree: bool = False,
1270        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1271        timeout: Optional[float] = None, 
1272        terminate_other_tasks: Optional[bool] = None, 
1273        terminate_other_tasks_on_success: bool = False, 
1274        terminate_other_tasks_on_failure: bool = False, 
1275        terminate_other_tasks_on_timeout: bool = False, 
1276        wait_for_termination: bool = False, 
1277        ) -> Tuple[CoroID, Any]:
1278    if isinstance(tasks, Task):
1279        tasks = [tasks]
1280    
1281    if not tasks:
1282        raise TasksListIsEmptyError
1283
1284    fastest_custom_wait_id: str = str(f'wait_task__fastest_custom__{uuid4()}')
1285    fastest_custom_info: FastestCustomInfo = FastestCustomInfo(fastest_custom_wait_id, tasks, result_criteria_handler)
1286
1287    if timeout is not None:
1288        # TODO: implement discard timer request when needed in order to improve performance
1289        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_custom_info.on_timeout)
1290
1291    for another_task in tasks:
1292        another_task.add_on_done_handler(FastestCustomOnDoneHandler(fastest_custom_info))
1293    
1294    await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_custom_wait_id))
1295    results: List[Tuple[CoroID, Any, Exception]] = fastest_custom_info.gather()
1296    if fastest_custom_info.timeout:
1297        termination_reason: TerminationReason = TerminationReason.timeout
1298    elif fastest_custom_info.failure:
1299        termination_reason = TerminationReason.failure
1300    else:
1301        termination_reason = TerminationReason.success
1302
1303    await aterminate_tasks_explicit(
1304        i, 
1305        tasks, 
1306        termination_reason, 
1307        terminate_tree, 
1308        graceful_termination_settings, 
1309        terminate_other_tasks, 
1310        terminate_other_tasks_on_success, 
1311        terminate_other_tasks_on_failure, 
1312        terminate_other_tasks_on_timeout, 
1313        wait_for_termination, 
1314        )
1315    if not results:
1316        return None, None
1317    
1318    coro_id, result, exception = results[0]
1319    if exception is not None:
1320        if isinstance(exception, TimeoutError) and (coro_id is not None):
1321            exception = SubTimeoutError()
1322        
1323        raise exception
1324    
1325    return coro_id, result
async def await_tasks_fastest_custom_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1328async def await_tasks_fastest_custom_implicit(
1329        tasks: Union[Task, List[Task]], 
1330        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1331        terminate_tree: bool = False,
1332        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1333        timeout: Optional[float] = None, 
1334        terminate_other_tasks: Optional[bool] = None, 
1335        terminate_other_tasks_on_success: bool = False, 
1336        terminate_other_tasks_on_failure: bool = False, 
1337        terminate_other_tasks_on_timeout: bool = False, 
1338        wait_for_termination: bool = False, 
1339        ) -> List[Tuple[CoroID, Any, Exception]]:
1340    i: Interface = current_interface()
1341    return await await_tasks_fastest_custom_explicit(
1342        i, 
1343        tasks, 
1344        result_criteria_handler, 
1345        terminate_tree, 
1346        graceful_termination_settings, 
1347        timeout, 
1348        terminate_other_tasks, 
1349        terminate_other_tasks_on_success, 
1350        terminate_other_tasks_on_failure, 
1351        terminate_other_tasks_on_timeout, 
1352        wait_for_termination, 
1353        )
async def await_tasks_fastest_custom( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> Tuple[int, Any]:
1265async def await_tasks_fastest_custom_explicit(
1266        i: Interface, 
1267        tasks: Union[Task, List[Task]], 
1268        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1269        terminate_tree: bool = False,
1270        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1271        timeout: Optional[float] = None, 
1272        terminate_other_tasks: Optional[bool] = None, 
1273        terminate_other_tasks_on_success: bool = False, 
1274        terminate_other_tasks_on_failure: bool = False, 
1275        terminate_other_tasks_on_timeout: bool = False, 
1276        wait_for_termination: bool = False, 
1277        ) -> Tuple[CoroID, Any]:
1278    if isinstance(tasks, Task):
1279        tasks = [tasks]
1280    
1281    if not tasks:
1282        raise TasksListIsEmptyError
1283
1284    fastest_custom_wait_id: str = str(f'wait_task__fastest_custom__{uuid4()}')
1285    fastest_custom_info: FastestCustomInfo = FastestCustomInfo(fastest_custom_wait_id, tasks, result_criteria_handler)
1286
1287    if timeout is not None:
1288        # TODO: implement discard timer request when needed in order to improve performance
1289        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, fastest_custom_info.on_timeout)
1290
1291    for another_task in tasks:
1292        another_task.add_on_done_handler(FastestCustomOnDoneHandler(fastest_custom_info))
1293    
1294    await i(AsyncEventBus, AsyncEventBusRequest().wait(fastest_custom_wait_id))
1295    results: List[Tuple[CoroID, Any, Exception]] = fastest_custom_info.gather()
1296    if fastest_custom_info.timeout:
1297        termination_reason: TerminationReason = TerminationReason.timeout
1298    elif fastest_custom_info.failure:
1299        termination_reason = TerminationReason.failure
1300    else:
1301        termination_reason = TerminationReason.success
1302
1303    await aterminate_tasks_explicit(
1304        i, 
1305        tasks, 
1306        termination_reason, 
1307        terminate_tree, 
1308        graceful_termination_settings, 
1309        terminate_other_tasks, 
1310        terminate_other_tasks_on_success, 
1311        terminate_other_tasks_on_failure, 
1312        terminate_other_tasks_on_timeout, 
1313        wait_for_termination, 
1314        )
1315    if not results:
1316        return None, None
1317    
1318    coro_id, result, exception = results[0]
1319    if exception is not None:
1320        if isinstance(exception, TimeoutError) and (coro_id is not None):
1321            exception = SubTimeoutError()
1322        
1323        raise exception
1324    
1325    return coro_id, result
async def await_tasks_fastest_custom_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1328async def await_tasks_fastest_custom_implicit(
1329        tasks: Union[Task, List[Task]], 
1330        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1331        terminate_tree: bool = False,
1332        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1333        timeout: Optional[float] = None, 
1334        terminate_other_tasks: Optional[bool] = None, 
1335        terminate_other_tasks_on_success: bool = False, 
1336        terminate_other_tasks_on_failure: bool = False, 
1337        terminate_other_tasks_on_timeout: bool = False, 
1338        wait_for_termination: bool = False, 
1339        ) -> List[Tuple[CoroID, Any, Exception]]:
1340    i: Interface = current_interface()
1341    return await await_tasks_fastest_custom_explicit(
1342        i, 
1343        tasks, 
1344        result_criteria_handler, 
1345        terminate_tree, 
1346        graceful_termination_settings, 
1347        timeout, 
1348        terminate_other_tasks, 
1349        terminate_other_tasks_on_success, 
1350        terminate_other_tasks_on_failure, 
1351        terminate_other_tasks_on_timeout, 
1352        wait_for_termination, 
1353        )
def wait_tasks_fastest_custom_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1360def wait_tasks_fastest_custom_explicit(
1361        i: Interface, 
1362        tasks: Union[Task, List[Task]], 
1363        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1364        terminate_tree: bool = False,
1365        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1366        timeout: Optional[float] = None, 
1367        terminate_other_tasks: Optional[bool] = None, 
1368        terminate_other_tasks_on_success: bool = False, 
1369        terminate_other_tasks_on_failure: bool = False, 
1370        terminate_other_tasks_on_timeout: bool = False, 
1371        wait_for_termination: bool = False, 
1372        ) -> List[Tuple[CoroID, Any, Exception]]:
1373    i(RunCoro, await_tasks_fastest_custom_explicit,
1374        tasks, 
1375        result_criteria_handler, 
1376        terminate_tree, 
1377        graceful_termination_settings, 
1378        timeout, 
1379        terminate_other_tasks, 
1380        terminate_other_tasks_on_success, 
1381        terminate_other_tasks_on_failure, 
1382        terminate_other_tasks_on_timeout, 
1383        wait_for_termination, 
1384        )
def wait_tasks_fastest_custom_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1387def wait_tasks_fastest_custom_implicit(
1388        tasks: Union[Task, List[Task]], 
1389        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1390        terminate_tree: bool = False,
1391        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1392        timeout: Optional[float] = None, 
1393        terminate_other_tasks: Optional[bool] = None, 
1394        terminate_other_tasks_on_success: bool = False, 
1395        terminate_other_tasks_on_failure: bool = False, 
1396        terminate_other_tasks_on_timeout: bool = False, 
1397        wait_for_termination: bool = False, 
1398        ) -> List[Tuple[CoroID, Any, Exception]]:
1399    i: Interface = current_interface()
1400    return wait_tasks_fastest_custom_explicit(
1401        i, 
1402        tasks, 
1403        result_criteria_handler, 
1404        terminate_tree, 
1405        graceful_termination_settings, 
1406        timeout, 
1407        terminate_other_tasks, 
1408        terminate_other_tasks_on_success, 
1409        terminate_other_tasks_on_failure, 
1410        terminate_other_tasks_on_timeout, 
1411        wait_for_termination, 
1412        )
def wait_tasks_fastest_custom( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1360def wait_tasks_fastest_custom_explicit(
1361        i: Interface, 
1362        tasks: Union[Task, List[Task]], 
1363        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1364        terminate_tree: bool = False,
1365        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1366        timeout: Optional[float] = None, 
1367        terminate_other_tasks: Optional[bool] = None, 
1368        terminate_other_tasks_on_success: bool = False, 
1369        terminate_other_tasks_on_failure: bool = False, 
1370        terminate_other_tasks_on_timeout: bool = False, 
1371        wait_for_termination: bool = False, 
1372        ) -> List[Tuple[CoroID, Any, Exception]]:
1373    i(RunCoro, await_tasks_fastest_custom_explicit,
1374        tasks, 
1375        result_criteria_handler, 
1376        terminate_tree, 
1377        graceful_termination_settings, 
1378        timeout, 
1379        terminate_other_tasks, 
1380        terminate_other_tasks_on_success, 
1381        terminate_other_tasks_on_failure, 
1382        terminate_other_tasks_on_timeout, 
1383        wait_for_termination, 
1384        )
def wait_tasks_fastest_custom_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1387def wait_tasks_fastest_custom_implicit(
1388        tasks: Union[Task, List[Task]], 
1389        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1390        terminate_tree: bool = False,
1391        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1392        timeout: Optional[float] = None, 
1393        terminate_other_tasks: Optional[bool] = None, 
1394        terminate_other_tasks_on_success: bool = False, 
1395        terminate_other_tasks_on_failure: bool = False, 
1396        terminate_other_tasks_on_timeout: bool = False, 
1397        wait_for_termination: bool = False, 
1398        ) -> List[Tuple[CoroID, Any, Exception]]:
1399    i: Interface = current_interface()
1400    return wait_tasks_fastest_custom_explicit(
1401        i, 
1402        tasks, 
1403        result_criteria_handler, 
1404        terminate_tree, 
1405        graceful_termination_settings, 
1406        timeout, 
1407        terminate_other_tasks, 
1408        terminate_other_tasks_on_success, 
1409        terminate_other_tasks_on_failure, 
1410        terminate_other_tasks_on_timeout, 
1411        wait_for_termination, 
1412        )
async def await_tasks_atomic_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False):
1422async def await_tasks_atomic_explicit(
1423        i: Interface, 
1424        tasks: Union[Task, List[Task]], 
1425        terminate_tree: bool = False,
1426        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1427        timeout: Optional[float] = None, 
1428        terminate_other_tasks: Optional[bool] = None, 
1429        terminate_other_tasks_on_success: bool = False, 
1430        terminate_other_tasks_on_failure: bool = False, 
1431        terminate_other_tasks_on_timeout: bool = False, 
1432        wait_for_termination: bool = False, 
1433        ):
1434    if terminate_other_tasks is not None:
1435        terminate_other_tasks_on_success = terminate_other_tasks
1436        terminate_other_tasks_on_failure = terminate_other_tasks
1437        terminate_other_tasks_on_timeout = terminate_other_tasks
1438
1439    if isinstance(tasks, Task):
1440        tasks = [tasks]
1441    
1442    if not tasks:
1443        raise TasksListIsEmptyError
1444
1445    atomic_wait_id: str = str(f'wait_task__atomic__{uuid4()}')
1446    atomic_info: AtomicInfo = AtomicInfo(atomic_wait_id, tasks)
1447
1448    if timeout is not None:
1449        # TODO: implement discard timer request when needed in order to improve performance
1450        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, atomic_info.on_timeout)
1451
1452    for another_task in tasks:
1453        another_task.add_on_done_handler(AtomicOnDoneHandler(atomic_info))
1454    
1455    await i(AsyncEventBus, AsyncEventBusRequest().wait(atomic_wait_id))
1456    results: List[Tuple[CoroID, Any, Exception]] = atomic_info.gather()
1457    if atomic_info.timeout:
1458        termination_reason: TerminationReason = TerminationReason.timeout
1459    elif atomic_info.failure:
1460        termination_reason = TerminationReason.failure
1461    else:
1462        termination_reason = TerminationReason.success
1463
1464    await aterminate_tasks_explicit(
1465        i, 
1466        tasks, 
1467        termination_reason, 
1468        terminate_tree, 
1469        graceful_termination_settings, 
1470        terminate_other_tasks, 
1471        terminate_other_tasks_on_success, 
1472        terminate_other_tasks_on_failure, 
1473        terminate_other_tasks_on_timeout, 
1474        wait_for_termination, 
1475        )
1476    return results
async def await_tasks_atomic_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1479async def await_tasks_atomic_implicit(
1480        tasks: Union[Task, List[Task]], 
1481        terminate_tree: bool = False,
1482        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1483        timeout: Optional[float] = None, 
1484        terminate_other_tasks: Optional[bool] = None, 
1485        terminate_other_tasks_on_success: bool = False, 
1486        terminate_other_tasks_on_failure: bool = False, 
1487        terminate_other_tasks_on_timeout: bool = False, 
1488        wait_for_termination: bool = False, 
1489        ) -> List[Tuple[CoroID, Any, Exception]]:
1490    i: Interface = current_interface()
1491    return await await_tasks_atomic_explicit(
1492        i, 
1493        tasks, 
1494        terminate_tree, 
1495        graceful_termination_settings, 
1496        timeout, 
1497        terminate_other_tasks, 
1498        terminate_other_tasks_on_success, 
1499        terminate_other_tasks_on_failure, 
1500        terminate_other_tasks_on_timeout, 
1501        wait_for_termination, 
1502        )
async def await_tasks_atomic( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False):
1422async def await_tasks_atomic_explicit(
1423        i: Interface, 
1424        tasks: Union[Task, List[Task]], 
1425        terminate_tree: bool = False,
1426        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1427        timeout: Optional[float] = None, 
1428        terminate_other_tasks: Optional[bool] = None, 
1429        terminate_other_tasks_on_success: bool = False, 
1430        terminate_other_tasks_on_failure: bool = False, 
1431        terminate_other_tasks_on_timeout: bool = False, 
1432        wait_for_termination: bool = False, 
1433        ):
1434    if terminate_other_tasks is not None:
1435        terminate_other_tasks_on_success = terminate_other_tasks
1436        terminate_other_tasks_on_failure = terminate_other_tasks
1437        terminate_other_tasks_on_timeout = terminate_other_tasks
1438
1439    if isinstance(tasks, Task):
1440        tasks = [tasks]
1441    
1442    if not tasks:
1443        raise TasksListIsEmptyError
1444
1445    atomic_wait_id: str = str(f'wait_task__atomic__{uuid4()}')
1446    atomic_info: AtomicInfo = AtomicInfo(atomic_wait_id, tasks)
1447
1448    if timeout is not None:
1449        # TODO: implement discard timer request when needed in order to improve performance
1450        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, atomic_info.on_timeout)
1451
1452    for another_task in tasks:
1453        another_task.add_on_done_handler(AtomicOnDoneHandler(atomic_info))
1454    
1455    await i(AsyncEventBus, AsyncEventBusRequest().wait(atomic_wait_id))
1456    results: List[Tuple[CoroID, Any, Exception]] = atomic_info.gather()
1457    if atomic_info.timeout:
1458        termination_reason: TerminationReason = TerminationReason.timeout
1459    elif atomic_info.failure:
1460        termination_reason = TerminationReason.failure
1461    else:
1462        termination_reason = TerminationReason.success
1463
1464    await aterminate_tasks_explicit(
1465        i, 
1466        tasks, 
1467        termination_reason, 
1468        terminate_tree, 
1469        graceful_termination_settings, 
1470        terminate_other_tasks, 
1471        terminate_other_tasks_on_success, 
1472        terminate_other_tasks_on_failure, 
1473        terminate_other_tasks_on_timeout, 
1474        wait_for_termination, 
1475        )
1476    return results
async def await_tasks_atomic_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1479async def await_tasks_atomic_implicit(
1480        tasks: Union[Task, List[Task]], 
1481        terminate_tree: bool = False,
1482        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1483        timeout: Optional[float] = None, 
1484        terminate_other_tasks: Optional[bool] = None, 
1485        terminate_other_tasks_on_success: bool = False, 
1486        terminate_other_tasks_on_failure: bool = False, 
1487        terminate_other_tasks_on_timeout: bool = False, 
1488        wait_for_termination: bool = False, 
1489        ) -> List[Tuple[CoroID, Any, Exception]]:
1490    i: Interface = current_interface()
1491    return await await_tasks_atomic_explicit(
1492        i, 
1493        tasks, 
1494        terminate_tree, 
1495        graceful_termination_settings, 
1496        timeout, 
1497        terminate_other_tasks, 
1498        terminate_other_tasks_on_success, 
1499        terminate_other_tasks_on_failure, 
1500        terminate_other_tasks_on_timeout, 
1501        wait_for_termination, 
1502        )
def wait_tasks_atomic_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1509def wait_tasks_atomic_explicit(
1510        i: Interface, 
1511        tasks: Union[Task, List[Task]], 
1512        terminate_tree: bool = False,
1513        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1514        timeout: Optional[float] = None, 
1515        terminate_other_tasks: Optional[bool] = None, 
1516        terminate_other_tasks_on_success: bool = False, 
1517        terminate_other_tasks_on_failure: bool = False, 
1518        terminate_other_tasks_on_timeout: bool = False, 
1519        wait_for_termination: bool = False, 
1520        ) -> List[Tuple[CoroID, Any, Exception]]:
1521    i(RunCoro, await_tasks_atomic_explicit,
1522        tasks, 
1523        terminate_tree, 
1524        graceful_termination_settings, 
1525        timeout, 
1526        terminate_other_tasks, 
1527        terminate_other_tasks_on_success, 
1528        terminate_other_tasks_on_failure, 
1529        terminate_other_tasks_on_timeout, 
1530        wait_for_termination, 
1531        )
def wait_tasks_atomic_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1534def wait_tasks_atomic_implicit(
1535        tasks: Union[Task, List[Task]], 
1536        terminate_tree: bool = False,
1537        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1538        timeout: Optional[float] = None, 
1539        terminate_other_tasks: Optional[bool] = None, 
1540        terminate_other_tasks_on_success: bool = False, 
1541        terminate_other_tasks_on_failure: bool = False, 
1542        terminate_other_tasks_on_timeout: bool = False, 
1543        wait_for_termination: bool = False, 
1544        ) -> List[Tuple[CoroID, Any, Exception]]:
1545    i: Interface = current_interface()
1546    return wait_tasks_atomic_explicit(
1547        i, 
1548        tasks, 
1549        terminate_tree, 
1550        graceful_termination_settings, 
1551        timeout, 
1552        terminate_other_tasks, 
1553        terminate_other_tasks_on_success, 
1554        terminate_other_tasks_on_failure, 
1555        terminate_other_tasks_on_timeout, 
1556        wait_for_termination, 
1557        )
def wait_tasks_atomic( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1509def wait_tasks_atomic_explicit(
1510        i: Interface, 
1511        tasks: Union[Task, List[Task]], 
1512        terminate_tree: bool = False,
1513        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1514        timeout: Optional[float] = None, 
1515        terminate_other_tasks: Optional[bool] = None, 
1516        terminate_other_tasks_on_success: bool = False, 
1517        terminate_other_tasks_on_failure: bool = False, 
1518        terminate_other_tasks_on_timeout: bool = False, 
1519        wait_for_termination: bool = False, 
1520        ) -> List[Tuple[CoroID, Any, Exception]]:
1521    i(RunCoro, await_tasks_atomic_explicit,
1522        tasks, 
1523        terminate_tree, 
1524        graceful_termination_settings, 
1525        timeout, 
1526        terminate_other_tasks, 
1527        terminate_other_tasks_on_success, 
1528        terminate_other_tasks_on_failure, 
1529        terminate_other_tasks_on_timeout, 
1530        wait_for_termination, 
1531        )
def wait_tasks_atomic_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1534def wait_tasks_atomic_implicit(
1535        tasks: Union[Task, List[Task]], 
1536        terminate_tree: bool = False,
1537        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1538        timeout: Optional[float] = None, 
1539        terminate_other_tasks: Optional[bool] = None, 
1540        terminate_other_tasks_on_success: bool = False, 
1541        terminate_other_tasks_on_failure: bool = False, 
1542        terminate_other_tasks_on_timeout: bool = False, 
1543        wait_for_termination: bool = False, 
1544        ) -> List[Tuple[CoroID, Any, Exception]]:
1545    i: Interface = current_interface()
1546    return wait_tasks_atomic_explicit(
1547        i, 
1548        tasks, 
1549        terminate_tree, 
1550        graceful_termination_settings, 
1551        timeout, 
1552        terminate_other_tasks, 
1553        terminate_other_tasks_on_success, 
1554        terminate_other_tasks_on_failure, 
1555        terminate_other_tasks_on_timeout, 
1556        wait_for_termination, 
1557        )
async def await_tasks_atomic_custom_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], first_failed_result_formatter: typing.Union[typing.Callable[[int, typing.Any, Exception], typing.Any], NoneType] = None, terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False):
1567async def await_tasks_atomic_custom_explicit(
1568        i: Interface, 
1569        tasks: Union[Task, List[Task]], 
1570        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1571        first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 
1572        terminate_tree: bool = False,
1573        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1574        timeout: Optional[float] = None, 
1575        terminate_other_tasks: Optional[bool] = None, 
1576        terminate_other_tasks_on_success: bool = False, 
1577        terminate_other_tasks_on_failure: bool = False, 
1578        terminate_other_tasks_on_timeout: bool = False, 
1579        wait_for_termination: bool = False, 
1580        ):
1581    if terminate_other_tasks is not None:
1582        terminate_other_tasks_on_success = terminate_other_tasks
1583        terminate_other_tasks_on_failure = terminate_other_tasks
1584        terminate_other_tasks_on_timeout = terminate_other_tasks
1585
1586    if isinstance(tasks, Task):
1587        tasks = [tasks]
1588    
1589    if not tasks:
1590        raise TasksListIsEmptyError
1591
1592    atomic_custom_wait_id: str = str(f'wait_task__atomic_custom__{uuid4()}')
1593    atomic_custom_info: AtomicCustomInfo = AtomicCustomInfo(
1594        atomic_custom_wait_id, 
1595        tasks, 
1596        result_criteria_handler, 
1597        first_failed_result_formatter, 
1598        )
1599
1600    if timeout is not None:
1601        # TODO: implement discard timer request when needed in order to improve performance
1602        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, atomic_custom_info.on_timeout)
1603
1604    for another_task in tasks:
1605        another_task.add_on_done_handler(AtomicCustomOnDoneHandler(atomic_custom_info))
1606    
1607    await i(AsyncEventBus, AsyncEventBusRequest().wait(atomic_custom_wait_id))
1608    results: List[Tuple[CoroID, Any, Exception]] = atomic_custom_info.gather()
1609    if atomic_custom_info.timeout:
1610        termination_reason: TerminationReason = TerminationReason.timeout
1611    elif atomic_custom_info.failure:
1612        termination_reason = TerminationReason.failure
1613    else:
1614        termination_reason = TerminationReason.success
1615
1616    await aterminate_tasks_explicit(
1617        i, 
1618        tasks, 
1619        termination_reason, 
1620        terminate_tree, 
1621        graceful_termination_settings, 
1622        terminate_other_tasks, 
1623        terminate_other_tasks_on_success, 
1624        terminate_other_tasks_on_failure, 
1625        terminate_other_tasks_on_timeout, 
1626        wait_for_termination, 
1627        )
1628    return results
async def await_tasks_atomic_custom_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], first_failed_result_formatter: typing.Union[typing.Callable[[int, typing.Any, Exception], typing.Any], NoneType] = None, terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1631async def await_tasks_atomic_custom_implicit(
1632        tasks: Union[Task, List[Task]], 
1633        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1634        first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 
1635        terminate_tree: bool = False,
1636        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1637        timeout: Optional[float] = None, 
1638        terminate_other_tasks: Optional[bool] = None, 
1639        terminate_other_tasks_on_success: bool = False, 
1640        terminate_other_tasks_on_failure: bool = False, 
1641        terminate_other_tasks_on_timeout: bool = False, 
1642        wait_for_termination: bool = False, 
1643        ) -> List[Tuple[CoroID, Any, Exception]]:
1644    i: Interface = current_interface()
1645    return await await_tasks_atomic_custom_explicit(
1646        i, 
1647        tasks, 
1648        result_criteria_handler, 
1649        first_failed_result_formatter, 
1650        terminate_tree, 
1651        graceful_termination_settings, 
1652        timeout, 
1653        terminate_other_tasks, 
1654        terminate_other_tasks_on_success, 
1655        terminate_other_tasks_on_failure, 
1656        terminate_other_tasks_on_timeout, 
1657        wait_for_termination, 
1658        )
async def await_tasks_atomic_custom( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], first_failed_result_formatter: typing.Union[typing.Callable[[int, typing.Any, Exception], typing.Any], NoneType] = None, terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False):
1567async def await_tasks_atomic_custom_explicit(
1568        i: Interface, 
1569        tasks: Union[Task, List[Task]], 
1570        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1571        first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 
1572        terminate_tree: bool = False,
1573        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1574        timeout: Optional[float] = None, 
1575        terminate_other_tasks: Optional[bool] = None, 
1576        terminate_other_tasks_on_success: bool = False, 
1577        terminate_other_tasks_on_failure: bool = False, 
1578        terminate_other_tasks_on_timeout: bool = False, 
1579        wait_for_termination: bool = False, 
1580        ):
1581    if terminate_other_tasks is not None:
1582        terminate_other_tasks_on_success = terminate_other_tasks
1583        terminate_other_tasks_on_failure = terminate_other_tasks
1584        terminate_other_tasks_on_timeout = terminate_other_tasks
1585
1586    if isinstance(tasks, Task):
1587        tasks = [tasks]
1588    
1589    if not tasks:
1590        raise TasksListIsEmptyError
1591
1592    atomic_custom_wait_id: str = str(f'wait_task__atomic_custom__{uuid4()}')
1593    atomic_custom_info: AtomicCustomInfo = AtomicCustomInfo(
1594        atomic_custom_wait_id, 
1595        tasks, 
1596        result_criteria_handler, 
1597        first_failed_result_formatter, 
1598        )
1599
1600    if timeout is not None:
1601        # TODO: implement discard timer request when needed in order to improve performance
1602        timer_func_run_on(get_interface_and_loop_with_explicit_loop(i._loop), timeout, atomic_custom_info.on_timeout)
1603
1604    for another_task in tasks:
1605        another_task.add_on_done_handler(AtomicCustomOnDoneHandler(atomic_custom_info))
1606    
1607    await i(AsyncEventBus, AsyncEventBusRequest().wait(atomic_custom_wait_id))
1608    results: List[Tuple[CoroID, Any, Exception]] = atomic_custom_info.gather()
1609    if atomic_custom_info.timeout:
1610        termination_reason: TerminationReason = TerminationReason.timeout
1611    elif atomic_custom_info.failure:
1612        termination_reason = TerminationReason.failure
1613    else:
1614        termination_reason = TerminationReason.success
1615
1616    await aterminate_tasks_explicit(
1617        i, 
1618        tasks, 
1619        termination_reason, 
1620        terminate_tree, 
1621        graceful_termination_settings, 
1622        terminate_other_tasks, 
1623        terminate_other_tasks_on_success, 
1624        terminate_other_tasks_on_failure, 
1625        terminate_other_tasks_on_timeout, 
1626        wait_for_termination, 
1627        )
1628    return results
async def await_tasks_atomic_custom_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], first_failed_result_formatter: typing.Union[typing.Callable[[int, typing.Any, Exception], typing.Any], NoneType] = None, terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1631async def await_tasks_atomic_custom_implicit(
1632        tasks: Union[Task, List[Task]], 
1633        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1634        first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 
1635        terminate_tree: bool = False,
1636        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1637        timeout: Optional[float] = None, 
1638        terminate_other_tasks: Optional[bool] = None, 
1639        terminate_other_tasks_on_success: bool = False, 
1640        terminate_other_tasks_on_failure: bool = False, 
1641        terminate_other_tasks_on_timeout: bool = False, 
1642        wait_for_termination: bool = False, 
1643        ) -> List[Tuple[CoroID, Any, Exception]]:
1644    i: Interface = current_interface()
1645    return await await_tasks_atomic_custom_explicit(
1646        i, 
1647        tasks, 
1648        result_criteria_handler, 
1649        first_failed_result_formatter, 
1650        terminate_tree, 
1651        graceful_termination_settings, 
1652        timeout, 
1653        terminate_other_tasks, 
1654        terminate_other_tasks_on_success, 
1655        terminate_other_tasks_on_failure, 
1656        terminate_other_tasks_on_timeout, 
1657        wait_for_termination, 
1658        )
def wait_tasks_atomic_custom_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], first_failed_result_formatter: typing.Union[typing.Callable[[int, typing.Any, Exception], typing.Any], NoneType] = None, terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1665def wait_tasks_atomic_custom_explicit(
1666        i: Interface, 
1667        tasks: Union[Task, List[Task]], 
1668        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1669        first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 
1670        terminate_tree: bool = False,
1671        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1672        timeout: Optional[float] = None, 
1673        terminate_other_tasks: Optional[bool] = None, 
1674        terminate_other_tasks_on_success: bool = False, 
1675        terminate_other_tasks_on_failure: bool = False, 
1676        terminate_other_tasks_on_timeout: bool = False, 
1677        wait_for_termination: bool = False, 
1678        ) -> List[Tuple[CoroID, Any, Exception]]:
1679    i(RunCoro, await_tasks_atomic_custom_explicit,
1680        tasks, 
1681        result_criteria_handler, 
1682        first_failed_result_formatter, 
1683        terminate_tree, 
1684        graceful_termination_settings, 
1685        timeout, 
1686        terminate_other_tasks, 
1687        terminate_other_tasks_on_success, 
1688        terminate_other_tasks_on_failure, 
1689        terminate_other_tasks_on_timeout, 
1690        wait_for_termination, 
1691        )
def wait_tasks_atomic_custom_implicit( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], first_failed_result_formatter: typing.Union[typing.Callable[[int, typing.Any, Exception], typing.Any], NoneType] = None, terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1694def wait_tasks_atomic_custom_implicit(
1695        tasks: Union[Task, List[Task]], 
1696        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1697        first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 
1698        terminate_tree: bool = False,
1699        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1700        timeout: Optional[float] = None, 
1701        terminate_other_tasks: Optional[bool] = None, 
1702        terminate_other_tasks_on_success: bool = False, 
1703        terminate_other_tasks_on_failure: bool = False, 
1704        terminate_other_tasks_on_timeout: bool = False, 
1705        wait_for_termination: bool = False, 
1706        ) -> List[Tuple[CoroID, Any, Exception]]:
1707    i: Interface = current_interface()
1708    return wait_tasks_atomic_custom_explicit(
1709        i, 
1710        tasks, 
1711        result_criteria_handler, 
1712        first_failed_result_formatter, 
1713        terminate_tree, 
1714        graceful_termination_settings, 
1715        timeout, 
1716        terminate_other_tasks, 
1717        terminate_other_tasks_on_success, 
1718        terminate_other_tasks_on_failure, 
1719        terminate_other_tasks_on_timeout, 
1720        wait_for_termination, 
1721        )
def wait_tasks_atomic_custom( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], first_failed_result_formatter: typing.Union[typing.Callable[[int, typing.Any, Exception], typing.Any], NoneType] = None, terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1665def wait_tasks_atomic_custom_explicit(
1666        i: Interface, 
1667        tasks: Union[Task, List[Task]], 
1668        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1669        first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 
1670        terminate_tree: bool = False,
1671        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1672        timeout: Optional[float] = None, 
1673        terminate_other_tasks: Optional[bool] = None, 
1674        terminate_other_tasks_on_success: bool = False, 
1675        terminate_other_tasks_on_failure: bool = False, 
1676        terminate_other_tasks_on_timeout: bool = False, 
1677        wait_for_termination: bool = False, 
1678        ) -> List[Tuple[CoroID, Any, Exception]]:
1679    i(RunCoro, await_tasks_atomic_custom_explicit,
1680        tasks, 
1681        result_criteria_handler, 
1682        first_failed_result_formatter, 
1683        terminate_tree, 
1684        graceful_termination_settings, 
1685        timeout, 
1686        terminate_other_tasks, 
1687        terminate_other_tasks_on_success, 
1688        terminate_other_tasks_on_failure, 
1689        terminate_other_tasks_on_timeout, 
1690        wait_for_termination, 
1691        )
def wait_tasks_atomic_custom_im( tasks: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]], result_criteria_handler: typing.Callable[[int, typing.Any, Exception], bool], first_failed_result_formatter: typing.Union[typing.Callable[[int, typing.Any, Exception], typing.Any], NoneType] = None, terminate_tree: bool = False, graceful_termination_settings: typing.Union[GracefulTerminationSettings, NoneType] = None, timeout: typing.Union[float, NoneType] = None, terminate_other_tasks: typing.Union[bool, NoneType] = None, terminate_other_tasks_on_success: bool = False, terminate_other_tasks_on_failure: bool = False, terminate_other_tasks_on_timeout: bool = False, wait_for_termination: bool = False) -> List[Tuple[int, Any, Exception]]:
1694def wait_tasks_atomic_custom_implicit(
1695        tasks: Union[Task, List[Task]], 
1696        result_criteria_handler: Callable[[CoroID, Any, Exception], bool], 
1697        first_failed_result_formatter: Optional[Callable[[CoroID, Any, Exception], Any]] = None, 
1698        terminate_tree: bool = False,
1699        graceful_termination_settings: Optional[GracefulTerminationSettings] = None,
1700        timeout: Optional[float] = None, 
1701        terminate_other_tasks: Optional[bool] = None, 
1702        terminate_other_tasks_on_success: bool = False, 
1703        terminate_other_tasks_on_failure: bool = False, 
1704        terminate_other_tasks_on_timeout: bool = False, 
1705        wait_for_termination: bool = False, 
1706        ) -> List[Tuple[CoroID, Any, Exception]]:
1707    i: Interface = current_interface()
1708    return wait_tasks_atomic_custom_explicit(
1709        i, 
1710        tasks, 
1711        result_criteria_handler, 
1712        first_failed_result_formatter, 
1713        terminate_tree, 
1714        graceful_termination_settings, 
1715        timeout, 
1716        terminate_other_tasks, 
1717        terminate_other_tasks_on_success, 
1718        terminate_other_tasks_on_failure, 
1719        terminate_other_tasks_on_timeout, 
1720        wait_for_termination, 
1721        )
def wait_task_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, task: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]]):
1731def wait_task_explicit(i: Interface, task: Union[Task, List[Task]]):
1732    wait_id: str = str(f'wait_task__{uuid4()}')
1733    def on_done_handler():
1734        put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high)
1735
1736    task.add_on_done_handler(on_done_handler)
1737    i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id))
1738    return task.result
def wait_task_implicit( task: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]]):
1741def wait_task_implicit(task: Union[Task, List[Task]]):
1742    i: Interface = current_interface()
1743    wait_id: str = str(f'wait_task__{uuid4()}')
1744    def on_done_handler():
1745        put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high)
1746
1747    task.add_on_done_handler(on_done_handler)
1748    i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id))
1749    return task.result
def wait_task( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, task: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]]):
1731def wait_task_explicit(i: Interface, task: Union[Task, List[Task]]):
1732    wait_id: str = str(f'wait_task__{uuid4()}')
1733    def on_done_handler():
1734        put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high)
1735
1736    task.add_on_done_handler(on_done_handler)
1737    i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id))
1738    return task.result
def wait_task_im( task: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]]):
1741def wait_task_implicit(task: Union[Task, List[Task]]):
1742    i: Interface = current_interface()
1743    wait_id: str = str(f'wait_task__{uuid4()}')
1744    def on_done_handler():
1745        put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high)
1746
1747    task.add_on_done_handler(on_done_handler)
1748    i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id))
1749    return task.result
async def await_task_explicit( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, task: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]]):
1756async def await_task_explicit(i: Interface, task: Union[Task, List[Task]]):
1757    wait_id: str = str(f'wait_task__{uuid4()}')
1758    def on_done_handler():
1759        put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high)
1760
1761    task.add_on_done_handler(on_done_handler)
1762    await i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id))
1763    return task.result
async def await_task_implicit( task: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]]):
1766async def await_task_implicit(task: Union[Task, List[Task]]):
1767    i: Interface = current_interface()
1768    wait_id: str = str(f'wait_task__{uuid4()}')
1769    def on_done_handler():
1770        put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high)
1771
1772    task.add_on_done_handler(on_done_handler)
1773    await i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id))
1774    return task.result
async def await_task( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, task: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]]):
1756async def await_task_explicit(i: Interface, task: Union[Task, List[Task]]):
1757    wait_id: str = str(f'wait_task__{uuid4()}')
1758    def on_done_handler():
1759        put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high)
1760
1761    task.add_on_done_handler(on_done_handler)
1762    await i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id))
1763    return task.result
async def await_task_im( task: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task, typing.List[cengal.parallel_execution.coroutines.coro_standard_services.put_coro.versions.v_0.put_coro.Task]]):
1766async def await_task_implicit(task: Union[Task, List[Task]]):
1767    i: Interface = current_interface()
1768    wait_id: str = str(f'wait_task__{uuid4()}')
1769    def on_done_handler():
1770        put_request_to_service(AsyncEventBus, wait_id, None, CoroPriority.high)
1771
1772    task.add_on_done_handler(on_done_handler)
1773    await i(AsyncEventBus, AsyncEventBusRequest().wait(wait_id))
1774    return task.result