cengal.parallel_execution.coroutines.coro_standard_services.wait_coro.versions.v_0.wait_coro
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19""" 20Module Docstring 21Docstrings: http://www.python.org/dev/peps/pep-0257/ 22""" 23 24 25__author__ = "ButenkoMS <gtalk@butenkoms.space>" 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 28__license__ = "Apache License, Version 2.0" 29__version__ = "4.4.1" 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 31__email__ = "gtalk@butenkoms.space" 32# __status__ = "Prototype" 33__status__ = "Development" 34# __status__ = "Production" 35 36 37__all__ = ['WaitCoro', 'PutSingleCoroParams', 'PSCP', 'WaitCoroRequest', 'CoroutineNotFoundError', 'SubCoroutineNotFoundError', 'TimeoutError', 'SubTimeoutError'] 38 39from cengal.parallel_execution.coroutines.coro_scheduler import * 40from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro 41from cengal.parallel_execution.coroutines.coro_standard_services.put_coro_list import PutSingleCoroParams, PSCP 42from cengal.parallel_execution.coroutines.coro_standard_services.timer_func_runner import timer_func_run_on 43from cengal.parallel_execution.coroutines.coro_standard_services.kill_coro import kill_coro_on 44from cengal.introspection.inspect import get_exception 45from cengal.data_manipulation.conversion.reinterpret_cast import reinterpret_cast 46from typing import Any, Optional, Sequence, Tuple, Dict, Set, Union, List, overload, Type 47 48 49class CoroutineNotFoundError(Exception): 50 pass 51 52 53class SubCoroutineNotFoundError(CoroutineNotFoundError): 54 pass 55 56 57class TimeoutError(Exception): 58 pass 59 60 61class SubTimeoutError(TimeoutError): 62 pass 63 64 65class ServParams: 66 @overload 67 def __init__(self, service_request_type: Type[TypedServiceRequest[ServiceResponseTypeVar]], *args, **kwargs) -> ServiceResponseTypeVar: ... 68 69 @overload 70 def __init__(self, service_request: TypedServiceRequest[ServiceResponseTypeVar]) -> ServiceResponseTypeVar: ... 71 72 @overload 73 def __init__(self, service_type: Type[TypedService[ServiceResponseTypeVar]], *args, **kwargs) -> ServiceResponseTypeVar: ... 74 75 @overload 76 def __init__(self, service_type: ServiceType, service_request: TypedServiceRequest[ServiceResponseTypeVar]) -> ServiceResponseTypeVar: ... 77 78 @overload 79 def __init__(self, service_request_type: Type[ServiceRequest], *args, **kwargs) -> ServiceResponseTypeVar: ... 80 81 @overload 82 def __init__(self, service_request: ServiceRequest) -> ServiceResponseTypeVar: ... 83 84 @overload 85 def __init__(self, service_type: ServiceType, *args, **kwargs) -> ServiceResponseTypeVar: ... 86 87 @overload 88 def __init__(self, service_type: ServiceType, service_request: ServiceRequest) -> ServiceResponseTypeVar: ... 89 90 def __init__(self, service_type, *args, **kwargs) -> None: 91 self.service_type: NormalizableServiceType = service_type 92 self.args: Tuple = args 93 self.kwargs: Dict = kwargs 94 95 def __call__(self) -> Tuple[NormalizableServiceType, Tuple, Dict]: 96 return self.service_type, self.args, self.kwargs 97 98 99SP = ServParams 100 101 102class WaitCoroRequest(ServiceRequest): 103 def __init__(self, timeout: Optional[float] = None, kill_on_timeout: bool = True, tree: bool = True, result_required: bool = True): 104 super().__init__() 105 self.provide_to_request_handler = True 106 self.timeout: Optional[float] = timeout 107 self.kill_on_timeout: bool = kill_on_timeout 108 self.tree: bool = tree 109 self.result_required: bool = result_required 110 111 def single(self, coro_id: CoroID) -> Union[Any, None]: 112 return self._save(0, coro_id) 113 114 def list(self, coro_list: Sequence[CoroID]) -> ServiceRequest: 115 return self._save(1, coro_list) 116 117 def atomic(self, coro_list: Sequence[CoroID]) -> ServiceRequest: 118 return self._save(2, coro_list) 119 120 def fastest(self, coro_list: Sequence[CoroID], num: int = 1, measure_time: bool = False) -> ServiceRequest: 121 return self._save(3, coro_list, num, measure_time) 122 123 def put_single(self, coro_worker: Worker, *args, **kwargs) -> ServiceRequest: 124 return self._save(4, coro_worker, *args, **kwargs) 125 126 def put_list(self, coro_list: Sequence[PutSingleCoroParams]) -> List[Tuple[CoroID, Any, Optional[Exception]]]: 127 return self._save(5, coro_list) 128 129 def put_atomic(self, coro_list: Sequence[PutSingleCoroParams]) -> ServiceRequest: 130 return self._save(6, coro_list) 131 132 def put_fastest(self, coro_list: Sequence[PutSingleCoroParams], num: int=1, measure_time: bool=False) -> ServiceRequest: 133 return self._save(7, coro_list, num, measure_time) 134 135 def serv_list(self, serv_params_list: Sequence[ServParams]) -> List[Tuple[CoroID, Any, Optional[Exception]]]: 136 """Creates a coroutine for each service request and waits for the result of each of them. 137 138 Args: 139 serv_params_list (Sequence[ServParams]): _description_ 140 141 Returns: 142 List[Tuple[CoroID, Any, Optional[Exception]]]: _description_ 143 """ 144 return self._save(8, serv_params_list) 145 146 def serv_atomic(self, serv_params_list: Sequence[ServParams]) -> ServiceRequest: 147 """Creates a coroutine for each service request and waits for the result of each of them. If one of the coroutines fails, the others are killed which may lead to cancel request processing in some services (see documentation of the service you are trying to use here). 148 149 Args: 150 serv_params_list (Sequence[ServParams]): _description_ 151 152 Returns: 153 ServiceRequest: _description_ 154 """ 155 return self._save(9, serv_params_list) 156 157 def serv_fastest(self, serv_params_list: Sequence[ServParams], num: int=1, measure_time: bool=False) -> ServiceRequest: 158 """Creates a coroutine for each service request and waits for the result of each of them. When one of coroutines finished, the others are killed which may lead to cancel request processing in some services (see documentation of the service you are trying to use here) 159 160 Args: 161 serv_params_list (Sequence[ServParams]): _description_ 162 num (int, optional): _description_. Defaults to 1. 163 measure_time (bool, optional): _description_. Defaults to False. 164 165 Returns: 166 ServiceRequest: _description_ 167 """ 168 return self._save(10, serv_params_list, num, measure_time) 169 170 def serv_and_forget_single(self, serv_params: ServParams) -> None: 171 """Creates a coroutine for a service request and returns immediately. 172 173 Args: 174 serv_params (ServParams): _description_ 175 176 Returns: 177 _type_: _description_ 178 """ 179 return self._save(11, serv_params) 180 181 def serv_and_forget_list(self, serv_params_list: Sequence[ServParams]) -> None: 182 """Creates a coroutine for each service request and returns immediately. 183 184 Args: 185 serv_params_list (Sequence[ServParams]): _description_ 186 187 Returns: 188 _type_: _description_ 189 """ 190 return self._save(12, serv_params_list) 191 192 193class SingleMethod(ServiceRequestMethodMixin): 194 195 def __init__(self, service): 196 super().__init__(service) 197 self.single_called_by: Dict[CoroID, CoroID] = dict() # Dict[CoroID, CoroID] # key - callable; value - requester 198 self.new_single_results: Set[Tuple[CoroID, Any, Optional[BaseException]]] = set() # (id, result, exception) 199 self.result_required_by: Dict[CoroID, bool] = dict() 200 201 def __call__(self, request: WaitCoroRequest, coro_id: CoroID) -> ServiceProcessingResponse: 202 requester_id: CoroID = self.service.current_caller_coro_info.coro_id 203 coro: CoroWrapperBase = self.service._loop.get_coro(coro_id) 204 if coro is None: 205 return (True, None, CoroutineNotFoundError(coro_id)) 206 207 coro.add_on_coro_del_handler(self._on_coro_del_handler) 208 self.single_called_by[coro_id] = requester_id 209 self.result_required_by[requester_id] = request.result_required 210 timeout: Optional[float] = request.timeout 211 if timeout is not None: 212 def timeout_handler(coro_id: CoroID, kill_on_timeout: bool, tree: bool): 213 if coro_id in self.single_called_by: 214 self.new_single_results.add((coro_id, None, TimeoutError(coro_id))) 215 self.service.make_live() 216 if kill_on_timeout: 217 kill_coro_on(get_interface_and_loop_with_explicit_loop(self.service._loop), coro_id, tree) 218 219 timer_func_run_on(get_interface_and_loop_with_explicit_loop(self.service._loop), timeout, timeout_handler, coro_id, request.kill_on_timeout, request.tree) 220 221 return (False, None, None) 222 223 def full_processing_iteration(self): 224 for coro_id, result, exception in self.new_single_results: 225 try: 226 requester_id: CoroID = self.single_called_by[coro_id] 227 if self.result_required_by[requester_id]: 228 if CoroutineNotFoundError == type(exception): 229 reinterpret_cast(SubCoroutineNotFoundError, exception) 230 elif TimeoutError == type(exception): 231 reinterpret_cast(SubTimeoutError, exception) 232 233 self.service.register_response(requester_id, result, exception) 234 else: 235 self.service.register_response(requester_id, None, None) 236 237 del self.result_required_by[requester_id] 238 del self.single_called_by[coro_id] 239 except KeyError: 240 pass 241 242 self.new_single_results = type(self.new_single_results)() 243 244 def in_work(self) -> bool: 245 return bool(self.new_single_results) 246 247 def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool: 248 if coro.coro_id in self.single_called_by: 249 self.new_single_results.add((coro.coro_id, coro.last_result, coro.exception)) 250 self.service.make_live() 251 252 return True 253 254 255class ListMethod(ServiceRequestMethodMixin): 256 257 def __init__(self, service): 258 super().__init__(service) 259 260 def __call__(self, request: WaitCoroRequest, coro_list: Sequence[Tuple[(Optional[CoroType], Worker, Tuple, Dict)]]) -> ServiceProcessingResponse: 261 requester_id = self.service.current_caller_coro_info.coro_id 262 for coro_id in coro_list: 263 coro = self.service._loop.get_coro(coro_id) 264 if coro is None: 265 return (True, None, CoroutineNotFoundError(coro_id)) 266 else: 267 coro.add_on_coro_del_handler(self._on_coro_del_handler) 268 self.list_called_by[coro_id] = requester_id 269 if requester_id not in self.list_wait_by_caller: 270 self.list_wait_by_caller[requester_id] = set() 271 272 self.list_wait_by_caller[requester_id].add(coro_id) 273 # timeout: Optional[float] = request.timeout 274 # if timeout is not None: 275 # def timeout_handler(requester_id: CoroID, coro_id: CoroID, kill_on_timeout: bool, tree: bool): 276 # if coro_id in self.single_called_by: 277 # self.new_single_results.add((coro_id, None, TimeoutError(coro_id))) 278 # self.service.make_live() 279 # if kill_on_timeout: 280 # kill_coro_on(get_interface_and_loop_with_explicit_loop(self.service._loop), coro_id, tree) 281 282 # timer_func_run_on(get_interface_and_loop_with_explicit_loop(self.service._loop), timeout, timeout_handler, requester_id, coro_id, request.kill_on_timeout, request.tree) 283 284 285 self.service.make_dead() 286 return (False, None, None) 287 288 def full_processing_iteration(self): 289 raise NotImplementedError 290 291 def in_work(self) -> bool: 292 raise NotImplementedError 293 294 def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool: 295 if coro.coro_id in self.single_called_by: 296 self.new_list_results.add((coro.coro_id, coro.last_result, coro.exception)) 297 self.service.make_live() 298 299 return True 300 301 302class PutListMethod(ServiceRequestMethodMixin): 303 304 def __init__(self, service): 305 super().__init__(service) 306 self.called_by = dict() 307 self.caller_waiting_set: Dict[CoroID, Set[CoroID]] = dict() 308 self.coro_indexes = dict() 309 self.caller_results = dict() 310 self.ready_requesters = set() 311 312 def __call__(self, request: WaitCoroRequest, coro_list: Sequence[PutSingleCoroParams]) -> ServiceProcessingResponse: 313 coroutines_list = list() 314 results = list() 315 requester_id = self.service.current_caller_coro_info.coro_id 316 try: 317 put_coro: PutCoro = self.service._loop.get_service_instance(PutCoro) 318 for coro_request in coro_list: 319 exception = None 320 result_coro_id = None 321 try: 322 coro_worker, args, kwargs = coro_request() 323 coro: CoroWrapperBase = put_coro.put_from_other_service(requester_id, coro_worker, *args, **kwargs) 324 coroutines_list.append(coro) 325 result_coro_id = coro.coro_id 326 except: 327 exception = get_exception() 328 329 results.append((result_coro_id, exception)) 330 except: 331 return True, results, get_exception() 332 else: 333 if not coroutines_list: 334 return True, results, None 335 336 if requester_id not in self.caller_waiting_set: 337 self.caller_waiting_set[requester_id] = set() 338 if requester_id not in self.coro_indexes: 339 self.coro_indexes[requester_id] = dict() 340 if requester_id not in self.caller_results: 341 self.caller_results[requester_id] = [ 342 None] * len(coroutines_list) 343 for index, coro in enumerate(coroutines_list): 344 coro_id = coro.coro_id 345 self.called_by[coro_id] = requester_id 346 self.caller_waiting_set[requester_id].add(coro_id) 347 self.coro_indexes[requester_id][coro_id] = index 348 coro.add_on_coro_del_handler(self._on_coro_del_handler) 349 350 timeout: Optional[float] = request.timeout 351 if timeout is not None: 352 def timeout_handler(requester_id: CoroID, kill_on_timeout: bool, tree: bool): 353 if requester_id not in self.caller_waiting_set: 354 return 355 356 caller_waiting_set = self.caller_waiting_set[requester_id] 357 del self.caller_waiting_set[requester_id] 358 for coro_id in caller_waiting_set: 359 del self.called_by[coro_id] 360 index = self.coro_indexes[requester_id][coro_id] 361 del self.coro_indexes[requester_id][coro_id] 362 self.caller_results[requester_id][index] = (coro_id, None, coro.exception) # TODO: should return TimeoutError exception instead 363 if kill_on_timeout: 364 kill_coro_on(get_interface_and_loop_with_explicit_loop(self.service._loop), coro_id, tree) 365 366 del self.coro_indexes[requester_id] 367 self.ready_requesters.add(requester_id) 368 self.service.make_live() 369 370 timer_func_run_on(get_interface_and_loop_with_explicit_loop(self.service._loop), timeout, timeout_handler, requester_id, request.kill_on_timeout, request.tree) 371 372 return False, None, None 373 374 def full_processing_iteration(self): 375 ready_requesters_buff = self.ready_requesters 376 self.ready_requesters = type(ready_requesters_buff)() 377 for requester_id in ready_requesters_buff: 378 self.service.register_response(requester_id, self.caller_results[requester_id], None) 379 del self.caller_results[requester_id] 380 381 def in_work(self) -> bool: 382 return bool(self.ready_requesters) 383 384 def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool: 385 coro_id = coro.coro_id 386 if coro_id in self.called_by: 387 requester_id = self.called_by[coro_id] 388 del self.called_by[coro_id] 389 self.caller_waiting_set[requester_id].remove(coro_id) 390 if not self.caller_waiting_set[requester_id]: 391 del self.caller_waiting_set[requester_id] 392 393 index = self.coro_indexes[requester_id][coro_id] 394 del self.coro_indexes[requester_id][coro_id] 395 self.caller_results[requester_id][index] = (coro.coro_id, coro.last_result, coro.exception) 396 if not self.coro_indexes[requester_id]: 397 del self.coro_indexes[requester_id] 398 self.ready_requesters.add(requester_id) 399 self.service.make_live() 400 401 return True 402 403 404class WaitCoro(Service): 405 406 def __init__(self, loop): 407 super(WaitCoro, self).__init__(loop) 408 self._single = SingleMethod(self) 409 self._list = ListMethod(self) 410 self._put_list = PutListMethod(self) 411 412 self._request_workers = { 413 0:self._single, 414 1:self.not_implemented, 415 2:self.not_implemented, 416 3:self.not_implemented, 417 4:self.not_implemented, 418 5:self._put_list, 419 6:self.not_implemented, 420 7:self.not_implemented 421 } 422 423 def single_task_registration_or_immediate_processing(self, request: Optional[WaitCoroRequest]) -> ServiceProcessingResponse: 424 if request is not None: 425 return self.resolve_request(request) 426 427 return (True, None, WrongServiceRequestError()) 428 429 def full_processing_iteration(self): 430 self._single.full_processing_iteration() 431 self._put_list.full_processing_iteration() 432 if not self.in_work(): 433 self.make_dead() 434 435 def in_work(self) -> bool: 436 result: bool = self._single.in_work() or self._put_list.in_work() 437 return self.thrifty_in_work(result) 438 439 def not_implemented(self): 440 raise NotImplementedError 441 442 443WaitCoroRequest.default_service_type = WaitCoro
405class WaitCoro(Service): 406 407 def __init__(self, loop): 408 super(WaitCoro, self).__init__(loop) 409 self._single = SingleMethod(self) 410 self._list = ListMethod(self) 411 self._put_list = PutListMethod(self) 412 413 self._request_workers = { 414 0:self._single, 415 1:self.not_implemented, 416 2:self.not_implemented, 417 3:self.not_implemented, 418 4:self.not_implemented, 419 5:self._put_list, 420 6:self.not_implemented, 421 7:self.not_implemented 422 } 423 424 def single_task_registration_or_immediate_processing(self, request: Optional[WaitCoroRequest]) -> ServiceProcessingResponse: 425 if request is not None: 426 return self.resolve_request(request) 427 428 return (True, None, WrongServiceRequestError()) 429 430 def full_processing_iteration(self): 431 self._single.full_processing_iteration() 432 self._put_list.full_processing_iteration() 433 if not self.in_work(): 434 self.make_dead() 435 436 def in_work(self) -> bool: 437 result: bool = self._single.in_work() or self._put_list.in_work() 438 return self.thrifty_in_work(result) 439 440 def not_implemented(self): 441 raise NotImplementedError
407 def __init__(self, loop): 408 super(WaitCoro, self).__init__(loop) 409 self._single = SingleMethod(self) 410 self._list = ListMethod(self) 411 self._put_list = PutListMethod(self) 412 413 self._request_workers = { 414 0:self._single, 415 1:self.not_implemented, 416 2:self.not_implemented, 417 3:self.not_implemented, 418 4:self.not_implemented, 419 5:self._put_list, 420 6:self.not_implemented, 421 7:self.not_implemented 422 }
436 def in_work(self) -> bool: 437 result: bool = self._single.in_work() or self._put_list.in_work() 438 return self.thrifty_in_work(result)
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- time_left_before_next_event
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
- destroy
50class PutSingleCoroParams: 51 def __init__(self, coro_worker: AnyWorker, *args, **kwargs) -> None: 52 self.coro_worker: AnyWorker = coro_worker 53 self.args: Tuple = args 54 self.kwargs: Dict = kwargs 55 56 def __call__(self) -> Any: 57 return self.coro_worker, self.args, self.kwargs
103class WaitCoroRequest(ServiceRequest): 104 def __init__(self, timeout: Optional[float] = None, kill_on_timeout: bool = True, tree: bool = True, result_required: bool = True): 105 super().__init__() 106 self.provide_to_request_handler = True 107 self.timeout: Optional[float] = timeout 108 self.kill_on_timeout: bool = kill_on_timeout 109 self.tree: bool = tree 110 self.result_required: bool = result_required 111 112 def single(self, coro_id: CoroID) -> Union[Any, None]: 113 return self._save(0, coro_id) 114 115 def list(self, coro_list: Sequence[CoroID]) -> ServiceRequest: 116 return self._save(1, coro_list) 117 118 def atomic(self, coro_list: Sequence[CoroID]) -> ServiceRequest: 119 return self._save(2, coro_list) 120 121 def fastest(self, coro_list: Sequence[CoroID], num: int = 1, measure_time: bool = False) -> ServiceRequest: 122 return self._save(3, coro_list, num, measure_time) 123 124 def put_single(self, coro_worker: Worker, *args, **kwargs) -> ServiceRequest: 125 return self._save(4, coro_worker, *args, **kwargs) 126 127 def put_list(self, coro_list: Sequence[PutSingleCoroParams]) -> List[Tuple[CoroID, Any, Optional[Exception]]]: 128 return self._save(5, coro_list) 129 130 def put_atomic(self, coro_list: Sequence[PutSingleCoroParams]) -> ServiceRequest: 131 return self._save(6, coro_list) 132 133 def put_fastest(self, coro_list: Sequence[PutSingleCoroParams], num: int=1, measure_time: bool=False) -> ServiceRequest: 134 return self._save(7, coro_list, num, measure_time) 135 136 def serv_list(self, serv_params_list: Sequence[ServParams]) -> List[Tuple[CoroID, Any, Optional[Exception]]]: 137 """Creates a coroutine for each service request and waits for the result of each of them. 138 139 Args: 140 serv_params_list (Sequence[ServParams]): _description_ 141 142 Returns: 143 List[Tuple[CoroID, Any, Optional[Exception]]]: _description_ 144 """ 145 return self._save(8, serv_params_list) 146 147 def serv_atomic(self, serv_params_list: Sequence[ServParams]) -> ServiceRequest: 148 """Creates a coroutine for each service request and waits for the result of each of them. If one of the coroutines fails, the others are killed which may lead to cancel request processing in some services (see documentation of the service you are trying to use here). 149 150 Args: 151 serv_params_list (Sequence[ServParams]): _description_ 152 153 Returns: 154 ServiceRequest: _description_ 155 """ 156 return self._save(9, serv_params_list) 157 158 def serv_fastest(self, serv_params_list: Sequence[ServParams], num: int=1, measure_time: bool=False) -> ServiceRequest: 159 """Creates a coroutine for each service request and waits for the result of each of them. When one of coroutines finished, the others are killed which may lead to cancel request processing in some services (see documentation of the service you are trying to use here) 160 161 Args: 162 serv_params_list (Sequence[ServParams]): _description_ 163 num (int, optional): _description_. Defaults to 1. 164 measure_time (bool, optional): _description_. Defaults to False. 165 166 Returns: 167 ServiceRequest: _description_ 168 """ 169 return self._save(10, serv_params_list, num, measure_time) 170 171 def serv_and_forget_single(self, serv_params: ServParams) -> None: 172 """Creates a coroutine for a service request and returns immediately. 173 174 Args: 175 serv_params (ServParams): _description_ 176 177 Returns: 178 _type_: _description_ 179 """ 180 return self._save(11, serv_params) 181 182 def serv_and_forget_list(self, serv_params_list: Sequence[ServParams]) -> None: 183 """Creates a coroutine for each service request and returns immediately. 184 185 Args: 186 serv_params_list (Sequence[ServParams]): _description_ 187 188 Returns: 189 _type_: _description_ 190 """ 191 return self._save(12, serv_params_list)
104 def __init__(self, timeout: Optional[float] = None, kill_on_timeout: bool = True, tree: bool = True, result_required: bool = True): 105 super().__init__() 106 self.provide_to_request_handler = True 107 self.timeout: Optional[float] = timeout 108 self.kill_on_timeout: bool = kill_on_timeout 109 self.tree: bool = tree 110 self.result_required: bool = result_required
136 def serv_list(self, serv_params_list: Sequence[ServParams]) -> List[Tuple[CoroID, Any, Optional[Exception]]]: 137 """Creates a coroutine for each service request and waits for the result of each of them. 138 139 Args: 140 serv_params_list (Sequence[ServParams]): _description_ 141 142 Returns: 143 List[Tuple[CoroID, Any, Optional[Exception]]]: _description_ 144 """ 145 return self._save(8, serv_params_list)
Creates a coroutine for each service request and waits for the result of each of them.
Args: serv_params_list (Sequence[ServParams]): _description_
Returns: List[Tuple[CoroID, Any, Optional[Exception]]]: _description_
147 def serv_atomic(self, serv_params_list: Sequence[ServParams]) -> ServiceRequest: 148 """Creates a coroutine for each service request and waits for the result of each of them. If one of the coroutines fails, the others are killed which may lead to cancel request processing in some services (see documentation of the service you are trying to use here). 149 150 Args: 151 serv_params_list (Sequence[ServParams]): _description_ 152 153 Returns: 154 ServiceRequest: _description_ 155 """ 156 return self._save(9, serv_params_list)
Creates a coroutine for each service request and waits for the result of each of them. If one of the coroutines fails, the others are killed which may lead to cancel request processing in some services (see documentation of the service you are trying to use here).
Args: serv_params_list (Sequence[ServParams]): _description_
Returns: ServiceRequest: _description_
158 def serv_fastest(self, serv_params_list: Sequence[ServParams], num: int=1, measure_time: bool=False) -> ServiceRequest: 159 """Creates a coroutine for each service request and waits for the result of each of them. When one of coroutines finished, the others are killed which may lead to cancel request processing in some services (see documentation of the service you are trying to use here) 160 161 Args: 162 serv_params_list (Sequence[ServParams]): _description_ 163 num (int, optional): _description_. Defaults to 1. 164 measure_time (bool, optional): _description_. Defaults to False. 165 166 Returns: 167 ServiceRequest: _description_ 168 """ 169 return self._save(10, serv_params_list, num, measure_time)
Creates a coroutine for each service request and waits for the result of each of them. When one of coroutines finished, the others are killed which may lead to cancel request processing in some services (see documentation of the service you are trying to use here)
Args: serv_params_list (Sequence[ServParams]): _description_ num (int, optional): _description_. Defaults to 1. measure_time (bool, optional): _description_. Defaults to False.
Returns: ServiceRequest: _description_
171 def serv_and_forget_single(self, serv_params: ServParams) -> None: 172 """Creates a coroutine for a service request and returns immediately. 173 174 Args: 175 serv_params (ServParams): _description_ 176 177 Returns: 178 _type_: _description_ 179 """ 180 return self._save(11, serv_params)
Creates a coroutine for a service request and returns immediately.
Args: serv_params (ServParams): _description_
Returns: _type_: _description_
182 def serv_and_forget_list(self, serv_params_list: Sequence[ServParams]) -> None: 183 """Creates a coroutine for each service request and returns immediately. 184 185 Args: 186 serv_params_list (Sequence[ServParams]): _description_ 187 188 Returns: 189 _type_: _description_ 190 """ 191 return self._save(12, serv_params_list)
Creates a coroutine for each service request and returns immediately.
Args: serv_params_list (Sequence[ServParams]): _description_
Returns: _type_: _description_
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
- default__request__type__
- request_type
- args
- kwargs
- interface
- i
- async_interface
- ai
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args