cengal.parallel_execution.coroutines.coro_standard_services.wait_coro.versions.v_0.wait_coro

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19"""
 20Module Docstring
 21Docstrings: http://www.python.org/dev/peps/pep-0257/
 22"""
 23
 24
 25__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 28__license__ = "Apache License, Version 2.0"
 29__version__ = "4.4.1"
 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 31__email__ = "gtalk@butenkoms.space"
 32# __status__ = "Prototype"
 33__status__ = "Development"
 34# __status__ = "Production"
 35
 36
 37__all__ = ['WaitCoro', 'PutSingleCoroParams', 'PSCP', 'WaitCoroRequest', 'CoroutineNotFoundError', 'SubCoroutineNotFoundError', 'TimeoutError', 'SubTimeoutError']
 38
 39from cengal.parallel_execution.coroutines.coro_scheduler import *
 40from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro
 41from cengal.parallel_execution.coroutines.coro_standard_services.put_coro_list import PutSingleCoroParams, PSCP
 42from cengal.parallel_execution.coroutines.coro_standard_services.timer_func_runner import timer_func_run_on
 43from cengal.parallel_execution.coroutines.coro_standard_services.kill_coro import kill_coro_on
 44from cengal.introspection.inspect import get_exception
 45from cengal.data_manipulation.conversion.reinterpret_cast import reinterpret_cast
 46from typing import Any, Optional, Sequence, Tuple, Dict, Set, Union, List, overload, Type
 47
 48
 49class CoroutineNotFoundError(Exception):
 50    pass
 51
 52
 53class SubCoroutineNotFoundError(CoroutineNotFoundError):
 54    pass
 55
 56
 57class TimeoutError(Exception):
 58    pass
 59
 60
 61class SubTimeoutError(TimeoutError):
 62    pass
 63
 64
 65class ServParams:
 66    @overload
 67    def __init__(self, service_request_type: Type[TypedServiceRequest[ServiceResponseTypeVar]], *args, **kwargs) -> ServiceResponseTypeVar: ...
 68
 69    @overload
 70    def __init__(self, service_request: TypedServiceRequest[ServiceResponseTypeVar]) -> ServiceResponseTypeVar: ...
 71
 72    @overload
 73    def __init__(self, service_type: Type[TypedService[ServiceResponseTypeVar]], *args, **kwargs) -> ServiceResponseTypeVar: ...
 74
 75    @overload
 76    def __init__(self, service_type: ServiceType, service_request: TypedServiceRequest[ServiceResponseTypeVar]) -> ServiceResponseTypeVar: ...
 77
 78    @overload
 79    def __init__(self, service_request_type: Type[ServiceRequest], *args, **kwargs) -> ServiceResponseTypeVar: ...
 80
 81    @overload
 82    def __init__(self, service_request: ServiceRequest) -> ServiceResponseTypeVar: ...
 83
 84    @overload
 85    def __init__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ...
 86
 87    @overload
 88    def __init__(self, service_type: ServiceType, service_request: ServiceRequest) -> ServiceResponseTypeVar: ...
 89
 90    def __init__(self, service_type, *args, **kwargs) -> None:
 91        self.service_type: NormalizableServiceType = service_type
 92        self.args: Tuple = args
 93        self.kwargs: Dict = kwargs
 94    
 95    def __call__(self) -> Tuple[NormalizableServiceType, Tuple, Dict]:
 96        return self.service_type, self.args, self.kwargs
 97
 98
 99SP = ServParams
100
101
102class WaitCoroRequest(ServiceRequest):
103    def __init__(self, timeout: Optional[float] = None, kill_on_timeout: bool = True, tree: bool = True, result_required: bool = True):
104        super().__init__()
105        self.provide_to_request_handler = True
106        self.timeout: Optional[float] = timeout
107        self.kill_on_timeout: bool = kill_on_timeout
108        self.tree: bool = tree
109        self.result_required: bool = result_required
110
111    def single(self, coro_id: CoroID) -> Union[Any, None]:
112        return self._save(0, coro_id)
113
114    def list(self, coro_list: Sequence[CoroID]) -> ServiceRequest:
115        return self._save(1, coro_list)
116
117    def atomic(self, coro_list: Sequence[CoroID]) -> ServiceRequest:
118        return self._save(2, coro_list)
119
120    def fastest(self, coro_list: Sequence[CoroID], num: int = 1, measure_time: bool = False) -> ServiceRequest:
121        return self._save(3, coro_list, num, measure_time)
122
123    def put_single(self, coro_worker: Worker, *args, **kwargs) -> ServiceRequest:
124        return self._save(4, coro_worker, *args, **kwargs)
125
126    def put_list(self, coro_list: Sequence[PutSingleCoroParams]) -> List[Tuple[CoroID, Any, Optional[Exception]]]:
127        return self._save(5, coro_list)
128
129    def put_atomic(self, coro_list: Sequence[PutSingleCoroParams]) -> ServiceRequest:
130        return self._save(6, coro_list)
131
132    def put_fastest(self, coro_list: Sequence[PutSingleCoroParams], num: int=1, measure_time: bool=False) -> ServiceRequest:
133        return self._save(7, coro_list, num, measure_time)
134
135    def serv_list(self, serv_params_list: Sequence[ServParams]) -> List[Tuple[CoroID, Any, Optional[Exception]]]:
136        """Creates a coroutine for each service request and waits for the result of each of them.
137
138        Args:
139            serv_params_list (Sequence[ServParams]): _description_
140
141        Returns:
142            List[Tuple[CoroID, Any, Optional[Exception]]]: _description_
143        """        
144        return self._save(8, serv_params_list)
145
146    def serv_atomic(self, serv_params_list: Sequence[ServParams]) -> ServiceRequest:
147        """Creates a coroutine for each service request and waits for the result of each of them. If one of the coroutines fails, the others are killed which may lead to cancel request processing in some services (see documentation of the service you are trying to use here).
148
149        Args:
150            serv_params_list (Sequence[ServParams]): _description_
151
152        Returns:
153            ServiceRequest: _description_
154        """        
155        return self._save(9, serv_params_list)
156
157    def serv_fastest(self, serv_params_list: Sequence[ServParams], num: int=1, measure_time: bool=False) -> ServiceRequest:
158        """Creates a coroutine for each service request and waits for the result of each of them. When one of coroutines finished, the others are killed which may lead to cancel request processing in some services (see documentation of the service you are trying to use here)
159
160        Args:
161            serv_params_list (Sequence[ServParams]): _description_
162            num (int, optional): _description_. Defaults to 1.
163            measure_time (bool, optional): _description_. Defaults to False.
164
165        Returns:
166            ServiceRequest: _description_
167        """        
168        return self._save(10, serv_params_list, num, measure_time)
169
170    def serv_and_forget_single(self, serv_params: ServParams) -> None:
171        """Creates a coroutine for a service request and returns immediately.
172
173        Args:
174            serv_params (ServParams): _description_
175
176        Returns:
177            _type_: _description_
178        """        
179        return self._save(11, serv_params)
180
181    def serv_and_forget_list(self, serv_params_list: Sequence[ServParams]) -> None:
182        """Creates a coroutine for each service request and returns immediately.
183
184        Args:
185            serv_params_list (Sequence[ServParams]): _description_
186
187        Returns:
188            _type_: _description_
189        """        
190        return self._save(12, serv_params_list)
191
192
193class SingleMethod(ServiceRequestMethodMixin):
194
195    def __init__(self, service):
196        super().__init__(service)
197        self.single_called_by: Dict[CoroID, CoroID] = dict()  # Dict[CoroID, CoroID] # key - callable; value - requester
198        self.new_single_results: Set[Tuple[CoroID, Any, Optional[BaseException]]] = set()  # (id, result, exception)
199        self.result_required_by: Dict[CoroID, bool] = dict()
200
201    def __call__(self, request: WaitCoroRequest, coro_id: CoroID) -> ServiceProcessingResponse:
202        requester_id: CoroID = self.service.current_caller_coro_info.coro_id
203        coro: CoroWrapperBase = self.service._loop.get_coro(coro_id)
204        if coro is None:
205            return (True, None, CoroutineNotFoundError(coro_id))
206
207        coro.add_on_coro_del_handler(self._on_coro_del_handler)
208        self.single_called_by[coro_id] = requester_id
209        self.result_required_by[requester_id] = request.result_required
210        timeout: Optional[float] = request.timeout
211        if timeout is not None:
212            def timeout_handler(coro_id: CoroID, kill_on_timeout: bool, tree: bool):
213                if coro_id in self.single_called_by:
214                    self.new_single_results.add((coro_id, None, TimeoutError(coro_id)))
215                    self.service.make_live()
216                    if kill_on_timeout:
217                        kill_coro_on(get_interface_and_loop_with_explicit_loop(self.service._loop), coro_id, tree)
218            
219            timer_func_run_on(get_interface_and_loop_with_explicit_loop(self.service._loop), timeout, timeout_handler, coro_id, request.kill_on_timeout, request.tree)
220
221        return (False, None, None)
222
223    def full_processing_iteration(self):
224        for coro_id, result, exception in self.new_single_results:
225            try:
226                requester_id: CoroID = self.single_called_by[coro_id]
227                if self.result_required_by[requester_id]:
228                    if CoroutineNotFoundError == type(exception):
229                        reinterpret_cast(SubCoroutineNotFoundError, exception)
230                    elif TimeoutError == type(exception):
231                        reinterpret_cast(SubTimeoutError, exception)
232                    
233                    self.service.register_response(requester_id, result, exception)
234                else:
235                    self.service.register_response(requester_id, None, None)
236                
237                del self.result_required_by[requester_id]
238                del self.single_called_by[coro_id]
239            except KeyError:
240                pass
241
242        self.new_single_results = type(self.new_single_results)()
243
244    def in_work(self) -> bool:
245        return bool(self.new_single_results)
246
247    def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool:
248        if coro.coro_id in self.single_called_by:
249            self.new_single_results.add((coro.coro_id, coro.last_result, coro.exception))
250            self.service.make_live()
251        
252        return True
253
254
255class ListMethod(ServiceRequestMethodMixin):
256
257    def __init__(self, service):
258        super().__init__(service)
259
260    def __call__(self, request: WaitCoroRequest, coro_list: Sequence[Tuple[(Optional[CoroType], Worker, Tuple, Dict)]]) -> ServiceProcessingResponse:
261        requester_id = self.service.current_caller_coro_info.coro_id
262        for coro_id in coro_list:
263            coro = self.service._loop.get_coro(coro_id)
264            if coro is None:
265                return (True, None, CoroutineNotFoundError(coro_id))
266            else:
267                coro.add_on_coro_del_handler(self._on_coro_del_handler)
268                self.list_called_by[coro_id] = requester_id
269                if requester_id not in self.list_wait_by_caller:
270                    self.list_wait_by_caller[requester_id] = set()
271                
272                self.list_wait_by_caller[requester_id].add(coro_id)
273                # timeout: Optional[float] = request.timeout
274                # if timeout is not None:
275                #     def timeout_handler(requester_id: CoroID, coro_id: CoroID, kill_on_timeout: bool, tree: bool):
276                #         if coro_id in self.single_called_by:
277                #             self.new_single_results.add((coro_id, None, TimeoutError(coro_id)))
278                #             self.service.make_live()
279                #             if kill_on_timeout:
280                #                 kill_coro_on(get_interface_and_loop_with_explicit_loop(self.service._loop), coro_id, tree)
281                    
282                #     timer_func_run_on(get_interface_and_loop_with_explicit_loop(self.service._loop), timeout, timeout_handler, requester_id, coro_id, request.kill_on_timeout, request.tree)
283
284
285        self.service.make_dead()
286        return (False, None, None)
287
288    def full_processing_iteration(self):
289        raise NotImplementedError
290
291    def in_work(self) -> bool:
292        raise NotImplementedError
293
294    def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool:
295        if coro.coro_id in self.single_called_by:
296            self.new_list_results.add((coro.coro_id, coro.last_result, coro.exception))
297            self.service.make_live()
298        
299        return True
300
301
302class PutListMethod(ServiceRequestMethodMixin):
303
304    def __init__(self, service):
305        super().__init__(service)
306        self.called_by = dict()
307        self.caller_waiting_set: Dict[CoroID, Set[CoroID]] = dict()
308        self.coro_indexes = dict()
309        self.caller_results = dict()
310        self.ready_requesters = set()
311
312    def __call__(self, request: WaitCoroRequest, coro_list: Sequence[PutSingleCoroParams]) -> ServiceProcessingResponse:
313        coroutines_list = list()
314        results = list()
315        requester_id = self.service.current_caller_coro_info.coro_id
316        try:
317            put_coro: PutCoro = self.service._loop.get_service_instance(PutCoro)
318            for coro_request in coro_list:
319                exception = None
320                result_coro_id = None
321                try:
322                    coro_worker, args, kwargs = coro_request()
323                    coro: CoroWrapperBase = put_coro.put_from_other_service(requester_id, coro_worker, *args, **kwargs)
324                    coroutines_list.append(coro)
325                    result_coro_id = coro.coro_id
326                except:
327                    exception = get_exception()
328
329                results.append((result_coro_id, exception))
330        except:
331            return True, results, get_exception()
332        else:
333            if not coroutines_list:
334                return True, results, None
335            
336            if requester_id not in self.caller_waiting_set:
337                self.caller_waiting_set[requester_id] = set()
338            if requester_id not in self.coro_indexes:
339                self.coro_indexes[requester_id] = dict()
340            if requester_id not in self.caller_results:
341                self.caller_results[requester_id] = [
342                 None] * len(coroutines_list)
343            for index, coro in enumerate(coroutines_list):
344                coro_id = coro.coro_id
345                self.called_by[coro_id] = requester_id
346                self.caller_waiting_set[requester_id].add(coro_id)
347                self.coro_indexes[requester_id][coro_id] = index
348                coro.add_on_coro_del_handler(self._on_coro_del_handler)
349
350            timeout: Optional[float] = request.timeout
351            if timeout is not None:
352                def timeout_handler(requester_id: CoroID, kill_on_timeout: bool, tree: bool):
353                    if requester_id not in self.caller_waiting_set:
354                        return
355                    
356                    caller_waiting_set = self.caller_waiting_set[requester_id]
357                    del self.caller_waiting_set[requester_id]
358                    for coro_id in caller_waiting_set:
359                        del self.called_by[coro_id]
360                        index = self.coro_indexes[requester_id][coro_id]
361                        del self.coro_indexes[requester_id][coro_id]
362                        self.caller_results[requester_id][index] = (coro_id, None, coro.exception)  # TODO: should return TimeoutError exception instead
363                        if kill_on_timeout:
364                            kill_coro_on(get_interface_and_loop_with_explicit_loop(self.service._loop), coro_id, tree)
365
366                    del self.coro_indexes[requester_id]
367                    self.ready_requesters.add(requester_id)
368                    self.service.make_live()
369                
370                timer_func_run_on(get_interface_and_loop_with_explicit_loop(self.service._loop), timeout, timeout_handler, requester_id, request.kill_on_timeout, request.tree)
371
372            return False, None, None
373
374    def full_processing_iteration(self):
375        ready_requesters_buff = self.ready_requesters
376        self.ready_requesters = type(ready_requesters_buff)()
377        for requester_id in ready_requesters_buff:
378            self.service.register_response(requester_id, self.caller_results[requester_id], None)
379            del self.caller_results[requester_id]
380
381    def in_work(self) -> bool:
382        return bool(self.ready_requesters)
383
384    def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool:
385        coro_id = coro.coro_id
386        if coro_id in self.called_by:
387            requester_id = self.called_by[coro_id]
388            del self.called_by[coro_id]
389            self.caller_waiting_set[requester_id].remove(coro_id)
390            if not self.caller_waiting_set[requester_id]:
391                del self.caller_waiting_set[requester_id]
392            
393            index = self.coro_indexes[requester_id][coro_id]
394            del self.coro_indexes[requester_id][coro_id]
395            self.caller_results[requester_id][index] = (coro.coro_id, coro.last_result, coro.exception)
396            if not self.coro_indexes[requester_id]:
397                del self.coro_indexes[requester_id]
398                self.ready_requesters.add(requester_id)
399                self.service.make_live()
400
401        return True
402
403
404class WaitCoro(Service):
405
406    def __init__(self, loop):
407        super(WaitCoro, self).__init__(loop)
408        self._single = SingleMethod(self)
409        self._list = ListMethod(self)
410        self._put_list = PutListMethod(self)
411
412        self._request_workers = {
413            0:self._single,
414            1:self.not_implemented,
415            2:self.not_implemented,
416            3:self.not_implemented,
417            4:self.not_implemented,
418            5:self._put_list,
419            6:self.not_implemented,
420            7:self.not_implemented
421        }
422
423    def single_task_registration_or_immediate_processing(self, request: Optional[WaitCoroRequest]) -> ServiceProcessingResponse:
424        if request is not None:
425            return self.resolve_request(request)
426            
427        return (True, None, WrongServiceRequestError())
428
429    def full_processing_iteration(self):
430        self._single.full_processing_iteration()
431        self._put_list.full_processing_iteration()
432        if not self.in_work():
433            self.make_dead()
434
435    def in_work(self) -> bool:
436        result: bool = self._single.in_work() or self._put_list.in_work()
437        return self.thrifty_in_work(result)
438
439    def not_implemented(self):
440        raise NotImplementedError
441
442
443WaitCoroRequest.default_service_type = WaitCoro
class WaitCoro(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service):
405class WaitCoro(Service):
406
407    def __init__(self, loop):
408        super(WaitCoro, self).__init__(loop)
409        self._single = SingleMethod(self)
410        self._list = ListMethod(self)
411        self._put_list = PutListMethod(self)
412
413        self._request_workers = {
414            0:self._single,
415            1:self.not_implemented,
416            2:self.not_implemented,
417            3:self.not_implemented,
418            4:self.not_implemented,
419            5:self._put_list,
420            6:self.not_implemented,
421            7:self.not_implemented
422        }
423
424    def single_task_registration_or_immediate_processing(self, request: Optional[WaitCoroRequest]) -> ServiceProcessingResponse:
425        if request is not None:
426            return self.resolve_request(request)
427            
428        return (True, None, WrongServiceRequestError())
429
430    def full_processing_iteration(self):
431        self._single.full_processing_iteration()
432        self._put_list.full_processing_iteration()
433        if not self.in_work():
434            self.make_dead()
435
436    def in_work(self) -> bool:
437        result: bool = self._single.in_work() or self._put_list.in_work()
438        return self.thrifty_in_work(result)
439
440    def not_implemented(self):
441        raise NotImplementedError
WaitCoro(loop)
407    def __init__(self, loop):
408        super(WaitCoro, self).__init__(loop)
409        self._single = SingleMethod(self)
410        self._list = ListMethod(self)
411        self._put_list = PutListMethod(self)
412
413        self._request_workers = {
414            0:self._single,
415            1:self.not_implemented,
416            2:self.not_implemented,
417            3:self.not_implemented,
418            4:self.not_implemented,
419            5:self._put_list,
420            6:self.not_implemented,
421            7:self.not_implemented
422        }
def single_task_registration_or_immediate_processing( self, request: typing.Union[WaitCoroRequest, NoneType]) -> Tuple[bool, Any, Union[BaseException, NoneType]]:
424    def single_task_registration_or_immediate_processing(self, request: Optional[WaitCoroRequest]) -> ServiceProcessingResponse:
425        if request is not None:
426            return self.resolve_request(request)
427            
428        return (True, None, WrongServiceRequestError())
def full_processing_iteration(self):
430    def full_processing_iteration(self):
431        self._single.full_processing_iteration()
432        self._put_list.full_processing_iteration()
433        if not self.in_work():
434            self.make_dead()
def in_work(self) -> bool:
436    def in_work(self) -> bool:
437        result: bool = self._single.in_work() or self._put_list.in_work()
438        return self.thrifty_in_work(result)

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

def not_implemented(self):
440    def not_implemented(self):
441        raise NotImplementedError
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
in_forground_work
thrifty_in_work
time_left_before_next_event
is_low_latency
make_live
make_dead
service_id_impl
service_id
destroy
class PutSingleCoroParams:
50class PutSingleCoroParams:
51    def __init__(self, coro_worker: AnyWorker, *args, **kwargs) -> None:
52        self.coro_worker: AnyWorker = coro_worker
53        self.args: Tuple = args
54        self.kwargs: Dict = kwargs
55    
56    def __call__(self) -> Any:
57        return self.coro_worker, self.args, self.kwargs
PutSingleCoroParams( coro_worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs)
51    def __init__(self, coro_worker: AnyWorker, *args, **kwargs) -> None:
52        self.coro_worker: AnyWorker = coro_worker
53        self.args: Tuple = args
54        self.kwargs: Dict = kwargs
coro_worker: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, Awaitable[Any]]]
args: Tuple
kwargs: Dict
PSCP = <class 'PutSingleCoroParams'>
class WaitCoroRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
103class WaitCoroRequest(ServiceRequest):
104    def __init__(self, timeout: Optional[float] = None, kill_on_timeout: bool = True, tree: bool = True, result_required: bool = True):
105        super().__init__()
106        self.provide_to_request_handler = True
107        self.timeout: Optional[float] = timeout
108        self.kill_on_timeout: bool = kill_on_timeout
109        self.tree: bool = tree
110        self.result_required: bool = result_required
111
112    def single(self, coro_id: CoroID) -> Union[Any, None]:
113        return self._save(0, coro_id)
114
115    def list(self, coro_list: Sequence[CoroID]) -> ServiceRequest:
116        return self._save(1, coro_list)
117
118    def atomic(self, coro_list: Sequence[CoroID]) -> ServiceRequest:
119        return self._save(2, coro_list)
120
121    def fastest(self, coro_list: Sequence[CoroID], num: int = 1, measure_time: bool = False) -> ServiceRequest:
122        return self._save(3, coro_list, num, measure_time)
123
124    def put_single(self, coro_worker: Worker, *args, **kwargs) -> ServiceRequest:
125        return self._save(4, coro_worker, *args, **kwargs)
126
127    def put_list(self, coro_list: Sequence[PutSingleCoroParams]) -> List[Tuple[CoroID, Any, Optional[Exception]]]:
128        return self._save(5, coro_list)
129
130    def put_atomic(self, coro_list: Sequence[PutSingleCoroParams]) -> ServiceRequest:
131        return self._save(6, coro_list)
132
133    def put_fastest(self, coro_list: Sequence[PutSingleCoroParams], num: int=1, measure_time: bool=False) -> ServiceRequest:
134        return self._save(7, coro_list, num, measure_time)
135
136    def serv_list(self, serv_params_list: Sequence[ServParams]) -> List[Tuple[CoroID, Any, Optional[Exception]]]:
137        """Creates a coroutine for each service request and waits for the result of each of them.
138
139        Args:
140            serv_params_list (Sequence[ServParams]): _description_
141
142        Returns:
143            List[Tuple[CoroID, Any, Optional[Exception]]]: _description_
144        """        
145        return self._save(8, serv_params_list)
146
147    def serv_atomic(self, serv_params_list: Sequence[ServParams]) -> ServiceRequest:
148        """Creates a coroutine for each service request and waits for the result of each of them. If one of the coroutines fails, the others are killed which may lead to cancel request processing in some services (see documentation of the service you are trying to use here).
149
150        Args:
151            serv_params_list (Sequence[ServParams]): _description_
152
153        Returns:
154            ServiceRequest: _description_
155        """        
156        return self._save(9, serv_params_list)
157
158    def serv_fastest(self, serv_params_list: Sequence[ServParams], num: int=1, measure_time: bool=False) -> ServiceRequest:
159        """Creates a coroutine for each service request and waits for the result of each of them. When one of coroutines finished, the others are killed which may lead to cancel request processing in some services (see documentation of the service you are trying to use here)
160
161        Args:
162            serv_params_list (Sequence[ServParams]): _description_
163            num (int, optional): _description_. Defaults to 1.
164            measure_time (bool, optional): _description_. Defaults to False.
165
166        Returns:
167            ServiceRequest: _description_
168        """        
169        return self._save(10, serv_params_list, num, measure_time)
170
171    def serv_and_forget_single(self, serv_params: ServParams) -> None:
172        """Creates a coroutine for a service request and returns immediately.
173
174        Args:
175            serv_params (ServParams): _description_
176
177        Returns:
178            _type_: _description_
179        """        
180        return self._save(11, serv_params)
181
182    def serv_and_forget_list(self, serv_params_list: Sequence[ServParams]) -> None:
183        """Creates a coroutine for each service request and returns immediately.
184
185        Args:
186            serv_params_list (Sequence[ServParams]): _description_
187
188        Returns:
189            _type_: _description_
190        """        
191        return self._save(12, serv_params_list)
WaitCoroRequest( timeout: typing.Union[float, NoneType] = None, kill_on_timeout: bool = True, tree: bool = True, result_required: bool = True)
104    def __init__(self, timeout: Optional[float] = None, kill_on_timeout: bool = True, tree: bool = True, result_required: bool = True):
105        super().__init__()
106        self.provide_to_request_handler = True
107        self.timeout: Optional[float] = timeout
108        self.kill_on_timeout: bool = kill_on_timeout
109        self.tree: bool = tree
110        self.result_required: bool = result_required
provide_to_request_handler
timeout: Union[float, NoneType]
kill_on_timeout: bool
tree: bool
result_required: bool
def single(self, coro_id: int) -> Union[Any, NoneType]:
112    def single(self, coro_id: CoroID) -> Union[Any, None]:
113        return self._save(0, coro_id)
def list( self, coro_list: typing.Sequence[int]) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
115    def list(self, coro_list: Sequence[CoroID]) -> ServiceRequest:
116        return self._save(1, coro_list)
def atomic( self, coro_list: typing.Sequence[int]) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
118    def atomic(self, coro_list: Sequence[CoroID]) -> ServiceRequest:
119        return self._save(2, coro_list)
def fastest( self, coro_list: typing.Sequence[int], num: int = 1, measure_time: bool = False) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
121    def fastest(self, coro_list: Sequence[CoroID], num: int = 1, measure_time: bool = False) -> ServiceRequest:
122        return self._save(3, coro_list, num, measure_time)
def put_single( self, coro_worker: typing.Union[collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]], *args, **kwargs) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
124    def put_single(self, coro_worker: Worker, *args, **kwargs) -> ServiceRequest:
125        return self._save(4, coro_worker, *args, **kwargs)
def put_list( self, coro_list: typing.Sequence[PutSingleCoroParams]) -> List[Tuple[int, Any, Union[Exception, NoneType]]]:
127    def put_list(self, coro_list: Sequence[PutSingleCoroParams]) -> List[Tuple[CoroID, Any, Optional[Exception]]]:
128        return self._save(5, coro_list)
def put_atomic( self, coro_list: typing.Sequence[PutSingleCoroParams]) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
130    def put_atomic(self, coro_list: Sequence[PutSingleCoroParams]) -> ServiceRequest:
131        return self._save(6, coro_list)
def put_fastest( self, coro_list: typing.Sequence[PutSingleCoroParams], num: int = 1, measure_time: bool = False) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
133    def put_fastest(self, coro_list: Sequence[PutSingleCoroParams], num: int=1, measure_time: bool=False) -> ServiceRequest:
134        return self._save(7, coro_list, num, measure_time)
def serv_list( self, serv_params_list: typing.Sequence[cengal.parallel_execution.coroutines.coro_standard_services.wait_coro.versions.v_0.wait_coro.ServParams]) -> List[Tuple[int, Any, Union[Exception, NoneType]]]:
136    def serv_list(self, serv_params_list: Sequence[ServParams]) -> List[Tuple[CoroID, Any, Optional[Exception]]]:
137        """Creates a coroutine for each service request and waits for the result of each of them.
138
139        Args:
140            serv_params_list (Sequence[ServParams]): _description_
141
142        Returns:
143            List[Tuple[CoroID, Any, Optional[Exception]]]: _description_
144        """        
145        return self._save(8, serv_params_list)

Creates a coroutine for each service request and waits for the result of each of them.

Args: serv_params_list (Sequence[ServParams]): _description_

Returns: List[Tuple[CoroID, Any, Optional[Exception]]]: _description_

def serv_atomic( self, serv_params_list: typing.Sequence[cengal.parallel_execution.coroutines.coro_standard_services.wait_coro.versions.v_0.wait_coro.ServParams]) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
147    def serv_atomic(self, serv_params_list: Sequence[ServParams]) -> ServiceRequest:
148        """Creates a coroutine for each service request and waits for the result of each of them. If one of the coroutines fails, the others are killed which may lead to cancel request processing in some services (see documentation of the service you are trying to use here).
149
150        Args:
151            serv_params_list (Sequence[ServParams]): _description_
152
153        Returns:
154            ServiceRequest: _description_
155        """        
156        return self._save(9, serv_params_list)

Creates a coroutine for each service request and waits for the result of each of them. If one of the coroutines fails, the others are killed which may lead to cancel request processing in some services (see documentation of the service you are trying to use here).

Args: serv_params_list (Sequence[ServParams]): _description_

Returns: ServiceRequest: _description_

def serv_fastest( self, serv_params_list: typing.Sequence[cengal.parallel_execution.coroutines.coro_standard_services.wait_coro.versions.v_0.wait_coro.ServParams], num: int = 1, measure_time: bool = False) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
158    def serv_fastest(self, serv_params_list: Sequence[ServParams], num: int=1, measure_time: bool=False) -> ServiceRequest:
159        """Creates a coroutine for each service request and waits for the result of each of them. When one of coroutines finished, the others are killed which may lead to cancel request processing in some services (see documentation of the service you are trying to use here)
160
161        Args:
162            serv_params_list (Sequence[ServParams]): _description_
163            num (int, optional): _description_. Defaults to 1.
164            measure_time (bool, optional): _description_. Defaults to False.
165
166        Returns:
167            ServiceRequest: _description_
168        """        
169        return self._save(10, serv_params_list, num, measure_time)

Creates a coroutine for each service request and waits for the result of each of them. When one of coroutines finished, the others are killed which may lead to cancel request processing in some services (see documentation of the service you are trying to use here)

Args: serv_params_list (Sequence[ServParams]): _description_ num (int, optional): _description_. Defaults to 1. measure_time (bool, optional): _description_. Defaults to False.

Returns: ServiceRequest: _description_

def serv_and_forget_single( self, serv_params: cengal.parallel_execution.coroutines.coro_standard_services.wait_coro.versions.v_0.wait_coro.ServParams) -> None:
171    def serv_and_forget_single(self, serv_params: ServParams) -> None:
172        """Creates a coroutine for a service request and returns immediately.
173
174        Args:
175            serv_params (ServParams): _description_
176
177        Returns:
178            _type_: _description_
179        """        
180        return self._save(11, serv_params)

Creates a coroutine for a service request and returns immediately.

Args: serv_params (ServParams): _description_

Returns: _type_: _description_

def serv_and_forget_list( self, serv_params_list: typing.Sequence[cengal.parallel_execution.coroutines.coro_standard_services.wait_coro.versions.v_0.wait_coro.ServParams]) -> None:
182    def serv_and_forget_list(self, serv_params_list: Sequence[ServParams]) -> None:
183        """Creates a coroutine for each service request and returns immediately.
184
185        Args:
186            serv_params_list (Sequence[ServParams]): _description_
187
188        Returns:
189            _type_: _description_
190        """        
191        return self._save(12, serv_params_list)

Creates a coroutine for each service request and returns immediately.

Args: serv_params_list (Sequence[ServParams]): _description_

Returns: _type_: _description_

default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] = <class 'WaitCoro'>
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
default__request__type__
request_type
args
kwargs
interface
i
async_interface
ai
class CoroutineNotFoundError(builtins.Exception):
50class CoroutineNotFoundError(Exception):
51    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class SubCoroutineNotFoundError(CoroutineNotFoundError):
54class SubCoroutineNotFoundError(CoroutineNotFoundError):
55    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class TimeoutError(builtins.Exception):
58class TimeoutError(Exception):
59    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class SubTimeoutError(TimeoutError):
62class SubTimeoutError(TimeoutError):
63    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args