cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19""" 20Module Docstring 21Docstrings: http://www.python.org/dev/peps/pep-0257/ 22""" 23 24 25__author__ = "ButenkoMS <gtalk@butenkoms.space>" 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 28__license__ = "Apache License, Version 2.0" 29__version__ = "4.4.1" 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 31__email__ = "gtalk@butenkoms.space" 32# __status__ = "Prototype" 33__status__ = "Development" 34# __status__ = "Production" 35 36 37__all__ = [ 38 'CoroPriority', 'ThisCoroWasRequestedToBeKilled', 'LoopYieldPrioritySchedulerRequest', 'LoopYieldManagedBase', 'LoopYieldManaged', 39 'LoopYieldManagedAsync', 'FakeLoopYieldManaged', 'LoopYieldPriorityScheduler', 'get_loop_yield', 40 'gly', 'aget_loop_yield', 'agly', 'LoopYieldManagedAsyncExternal', 'FakeLoopYieldManagedAsync', 'external_aget_loop_yield', 'eagly' 41] 42 43from asyncio.events import Handle 44from cengal.parallel_execution.coroutines.coro_scheduler import * 45from cengal.parallel_execution.coroutines.coro_tools.await_coro import * 46from enum import Enum 47from typing import Callable, Dict, Tuple, Union, Type, Optional, Any, Set, Hashable 48from cengal.time_management.repeat_for_a_time import Tracer, TimeLimitIsTooSmall 49from cengal.code_flow_control.smart_values.versions.v_1 import ValueExistence 50from async_generator import asynccontextmanager, async_generator, yield_ 51import asyncio 52import sys 53 54 55MIN_TIME = 0.000001 56 57 58class CoroPriority(Enum): 59 high = 0 60 normal = 1 61 low = 2 62 63 64class LoopYieldPrioritySchedulerRequest(ServiceRequest): 65 def register(self, default_priority: CoroPriority) -> 'LoopYieldManagedBase': 66 return self._save(0, default_priority) 67 68 def setup(self, max_delay: float) -> None: 69 return self._save(1, max_delay) 70 71 def change_priority(self, new_priority: CoroPriority, old_priority: CoroPriority) -> None: 72 return self._save(2, new_priority, old_priority) 73 74 def get(self) -> 'LoopYieldManagedBase': 75 return self._save(3) 76 77 def register_external(self, asyncio_loop: asyncio.AbstractEventLoop, default_priority: CoroPriority) -> 'LoopYieldManagedAsyncExternal': 78 return self._save(4, asyncio_loop, default_priority) 79 80 def register_external_asyncio_task(self, asyncio_loop: asyncio.AbstractEventLoop, task: asyncio.Task, default_priority: CoroPriority) -> 'LoopYieldManagedAsyncExternal': 81 return self._save(5, asyncio_loop, task, default_priority) 82 83 def change_priority_external(self, loop_yield: 'LoopYieldManagedAsyncExternal', new_priority: CoroPriority, old_priority: CoroPriority) -> None: 84 return self._save(6, loop_yield, new_priority, old_priority) 85 86 def del_external(self, loop_yield: 'LoopYieldManagedAsyncExternal') -> None: 87 return self._save(7, loop_yield) 88 89 def request_coro_kill(self, coro_id: CoroID) -> None: 90 """Make request for a coro kill asynchronously (immidiately returns). An exception ThisCoroWasRequestedToBeKilled will be emmited in coro with a requested coro_id during one of it's next requests to LoopYield service 91 92 Args: 93 coro_id (CoroID): _description_ 94 95 Returns: 96 ServiceRequest: _description_ 97 """ 98 return self._save(8, coro_id) 99 100 def kill_coro(self, coro_id: CoroID) -> None: 101 """Make request for a coro kill synchronously (waits for next steps). An exception ThisCoroWasRequestedToBeKilled will be emmited in coro with a requested coro_id during one of it's next requests to LoopYield service. After an exception was sent to the requested coro_id, current coro will receive an answer from the service (None). 102 103 Args: 104 coro_id (CoroID): _description_ 105 106 Returns: 107 ServiceRequest: _description_ 108 """ 109 return self._save(9, coro_id) 110 111 112class LoopYieldManagedBase: 113 def __init__(self, interface: Interface, time_atom: ValueExistence, 114 default_priority: CoroPriority, service: Type[Service]): 115 self._interface = None 116 self.interface = interface 117 self.time_atom = time_atom 118 self.default_priority = default_priority 119 self.priority = self.default_priority 120 self.service = service 121 self.tracer = None 122 123 @property 124 def interface(self): 125 if self._interface is None: 126 self._interface = current_interface() 127 128 return self._interface 129 130 @interface.setter 131 def interface(self, value): 132 self._interface = value 133 134 135class LoopYieldManaged(LoopYieldManagedBase): 136 def __init__(self, interface: Interface, time_atom: ValueExistence, 137 default_priority: CoroPriority, service: Type[Service]): 138 super(LoopYieldManaged, self).__init__(interface, time_atom, default_priority, service) 139 140 def __call__(self, priority: Optional[CoroPriority] = None): 141 if priority is None: 142 priority = self.default_priority 143 144 if priority != self.priority: 145 self.interface = None 146 self.interface(self.service, LoopYieldPrioritySchedulerRequest().change_priority(priority, 147 self.priority)) 148 self.priority = priority 149 try: 150 self.tracer = Tracer(self.time_atom.result) 151 except TimeLimitIsTooSmall as ex: 152 self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME) 153 154 if self.tracer is None: 155 exception = None 156 try: 157 self.tracer = Tracer(self.time_atom.result) 158 except TimeLimitIsTooSmall as ex: 159 exception = ex 160 161 if exception is not None: 162 try: 163 self.tracer = Tracer(exception.min_time if exception.min_time is not None else MIN_TIME) 164 except TimeLimitIsTooSmall as ex: 165 print(ex) 166 167 if self.tracer is not None: 168 if not self.tracer.iter(): 169 self.tracer = None 170 self.interface = None 171 self.interface(self.service) 172 173 174class LoopYieldManagedAsync(LoopYieldManagedBase): 175 def __init__(self, interface: Interface, time_atom: ValueExistence, 176 default_priority: CoroPriority, service: Type[Service]): 177 super(LoopYieldManagedAsync, self).__init__(interface, time_atom, default_priority, service) 178 179 async def __call__(self, priority: Optional[CoroPriority] = None): 180 if priority is None: 181 priority = self.default_priority 182 if priority != self.priority: 183 self.interface = None 184 await self.interface(self.service, LoopYieldPrioritySchedulerRequest().change_priority(priority, 185 self.priority)) 186 self.priority = priority 187 try: 188 self.tracer = Tracer(self.time_atom.result) 189 except TimeLimitIsTooSmall as ex: 190 self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME) 191 if self.tracer is None: 192 try: 193 self.tracer = Tracer(self.time_atom.result) 194 except TimeLimitIsTooSmall as ex: 195 self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME) 196 if not self.tracer.iter(): 197 self.tracer = None 198 self.interface = None 199 await self.interface(self.service) 200 201 202class LoopYieldManagedAsyncExternal(LoopYieldManagedBase): 203 def __init__(self, task_id: int, time_atom: ValueExistence, 204 default_priority: CoroPriority, service: Type[Service], coro_scheduler: CoroSchedulerType, asyncio_loop: asyncio.AbstractEventLoop): 205 super(LoopYieldManagedAsyncExternal, self).__init__(None, time_atom, default_priority, service) 206 self.task_id = task_id 207 self.coro_scheduler = coro_scheduler 208 self.asyncio_loop = asyncio_loop 209 self.asyncio_task: asyncio.Task = None 210 self.on_done_asyncio_coro: Callable = None 211 212 async def __call__(self, priority: Optional[CoroPriority] = None): 213 if priority is None: 214 priority = self.default_priority 215 if priority != self.priority: 216 await await_task_fast(self.asyncio_loop, self.coro_scheduler, self.service, 217 LoopYieldPrioritySchedulerRequest().change_priority_external(self, priority, self.priority)) 218 self.priority = priority 219 try: 220 self.tracer = Tracer(self.time_atom.result) 221 except TimeLimitIsTooSmall as ex: 222 self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME) 223 if self.tracer is None: 224 try: 225 self.tracer = Tracer(self.time_atom.result) 226 except TimeLimitIsTooSmall as ex: 227 self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME) 228 if not self.tracer.iter(): 229 self.tracer = None 230 await await_task_fast(self.asyncio_loop, self.coro_scheduler, self.service) 231 232 233class FakeLoopYieldManaged: 234 def __call__(self, priority: Optional[CoroPriority] = None): 235 pass 236 237class FakeLoopYieldManagedAsync: 238 async def __call__(self, priority: Optional[CoroPriority] = None): 239 pass 240 241 242class ThisCoroWasRequestedToBeKilled(Exception): 243 pass 244 245 246class LoopYieldPriorityScheduler(TypedService[None], EntityStatsMixin): 247 def __init__(self, loop: CoroSchedulerType): 248 super(LoopYieldPriorityScheduler, self).__init__(loop) 249 250 # loop.add_global_on_coro_del_handler(self._on_coro_del_handler_global) # Todo: switch to local coro del handler 251 252 self._request_workers = { 253 0: self._on_register, 254 1: self._on_setup, 255 2: self._on_change_priority, 256 3: self._on_get, 257 4: self._on_register_external, 258 5: self._on_register_external_asyncio_task, 259 6: self._on_change_priority_external, 260 7: self._on_del_external, 261 8: self._request_coro_kill, 262 9: self._kill_coro, 263 } 264 265 self.sigma = { 266 0: 0.6827, 267 1: 0.9545 - 0.6827, 268 2: 1.0 - 0.9545, 269 } 270 271 self.max_delay = 0.01 272 self.max_delays = { 273 0: 0.0, 274 1: 0.0, 275 2: 0.0, 276 } 277 self.compute_delays() 278 279 self.all_yield_objects = dict() # type: Dict[CoroID, LoopYieldManagedBase] 280 self.task_counter = Counter() 281 while self.task_counter.get() <= 0: 282 pass 283 284 self.yields_num: int = 0 285 self.coroutines_requested_to_be_deleted: Set[CoroID] = set() 286 self.coroutines_requested_to_be_deleted_by_waiters: Dict[CoroID, Set[CoroID]] = dict() 287 self.finished_waiters_for_coro_kill: Set[CoroID] = set() 288 self.asyncio_task_ids: Dict[Hashable, int] = dict() 289 290 self.yields_by_priority: Dict[CoroPriority, int] = { 291 CoroPriority.high: 0, 292 CoroPriority.normal: 0, 293 CoroPriority.low: 0, 294 } 295 296 self.time_atom_by_priority = { 297 CoroPriority.high: ValueExistence(True, self.max_delays[0]), 298 CoroPriority.normal: ValueExistence(True, self.max_delays[1]), 299 CoroPriority.low: ValueExistence(True, self.max_delays[2]), 300 } 301 302 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 303 coroutines_requested_to_be_killed = self.coroutines_requested_to_be_deleted | set(self.coroutines_requested_to_be_deleted_by_waiters.keys()) 304 return type(self).__name__, { 305 'task counter': self.task_counter._index, 306 'yields num': self.yields_num, 307 'max_delay': self.max_delay, 308 'max_delays': self.max_delays, 309 'affected coroutines num': len(self.all_yield_objects), 310 'coroutines num by priority': { 311 'high': self.yields_by_priority[CoroPriority.high], 312 'normal': self.yields_by_priority[CoroPriority.normal], 313 'low': self.yields_by_priority[CoroPriority.low], 314 }, 315 'time atoms by priority': { 316 'high': self.time_atom_by_priority[CoroPriority.high].result, 317 'normal': self.time_atom_by_priority[CoroPriority.normal].result, 318 'low': self.time_atom_by_priority[CoroPriority.low].result, 319 }, 320 'coroutines requested to be killed': { 321 'num': len(coroutines_requested_to_be_killed), 322 'list': coroutines_requested_to_be_killed, 323 } 324 } 325 326 def single_task_registration_or_immediate_processing(self, request: Optional[LoopYieldPrioritySchedulerRequest]=None 327 ) -> Tuple[bool, Any, None]: 328 self.yields_num += 1 329 coro_should_be_killed = False 330 coro_id = self.current_caller_coro_info.coro_id 331 if coro_id in self.coroutines_requested_to_be_deleted: 332 coro_should_be_killed = True 333 334 if coro_id in self.coroutines_requested_to_be_deleted_by_waiters: 335 self.finished_waiters_for_coro_kill |= self.coroutines_requested_to_be_deleted_by_waiters[coro_id] 336 self.make_live() 337 coro_should_be_killed = True 338 339 if request is not None: 340 if coro_should_be_killed and (self._on_del_external != self._request_workers[request.request_type]): 341 return True, None, ThisCoroWasRequestedToBeKilled 342 343 return self.resolve_request(request) 344 345 if coro_should_be_killed: 346 return True, None, ThisCoroWasRequestedToBeKilled 347 348 return True, None, None 349 350 def full_processing_iteration(self): 351 for coro_id in self.finished_waiters_for_coro_kill: 352 self.register_response(coro_id, None) 353 354 self.compute_time_atoms() 355 356 def compute_time_atoms(self): 357 if self.all_yield_objects: 358 top_sigma = 0 359 360 if self.yields_by_priority[CoroPriority.high]: 361 top_sigma += 1 362 363 if self.yields_by_priority[CoroPriority.normal]: 364 top_sigma += 1 365 366 if self.yields_by_priority[CoroPriority.low]: 367 top_sigma += 1 368 369 median_time_atom = 1 / len(self.all_yield_objects) # !!! Possible division by zero! Conditional must not be removed! 370 if 1 == top_sigma: 371 sigma_0_time_atom = min(median_time_atom, self.max_delays[0]) 372 self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom 373 self.time_atom_by_priority[CoroPriority.normal].result = sigma_0_time_atom 374 self.time_atom_by_priority[CoroPriority.low].result = sigma_0_time_atom 375 elif 2 == top_sigma: 376 sigma_0_time_atom = min(median_time_atom * self.sigma[0], self.max_delays[0]) 377 sigma_1_time_atom = min(median_time_atom * self.sigma[1], self.max_delays[1]) 378 if self.yields_by_priority[CoroPriority.high]: 379 self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom 380 self.time_atom_by_priority[CoroPriority.normal].result = sigma_1_time_atom 381 self.time_atom_by_priority[CoroPriority.low].result = sigma_1_time_atom 382 else: 383 self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom 384 self.time_atom_by_priority[CoroPriority.normal].result = sigma_0_time_atom 385 self.time_atom_by_priority[CoroPriority.low].result = sigma_1_time_atom 386 elif 3 == top_sigma: 387 sigma_0_time_atom = min(median_time_atom * self.sigma[0], self.max_delays[0]) 388 sigma_1_time_atom = min(median_time_atom * self.sigma[1], self.max_delays[1]) 389 sigma_2_time_atom = min(median_time_atom * self.sigma[2], self.max_delays[2]) 390 self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom 391 self.time_atom_by_priority[CoroPriority.normal].result = sigma_1_time_atom 392 self.time_atom_by_priority[CoroPriority.low].result = sigma_2_time_atom 393 394 self.make_dead() 395 396 def in_work(self) -> bool: 397 result: bool = bool(self.finished_waiters_for_coro_kill) or bool(self.all_yield_objects) 398 return self.thrifty_in_work(result) 399 400 def compute_delays(self): 401 self.max_delays = { 402 0: self.max_delay * self.sigma[0], 403 1: self.max_delay * self.sigma[1], 404 2: self.max_delay * self.sigma[2], 405 } 406 407 def _on_register(self, default_priority: CoroPriority): 408 task_id = self.current_caller_coro_info.coro_id 409 if task_id in self.all_yield_objects: 410 loop_yield = self.all_yield_objects[task_id] 411 else: 412 interface: Interface = self.current_caller_coro_info.coro.interface 413 if isinstance(interface, InterfaceGreenlet): 414 loop_yield = LoopYieldManaged(interface, 415 self.time_atom_by_priority[default_priority], 416 default_priority, 417 type(self)) 418 elif isinstance(interface, InterfaceAsyncAwait): 419 loop_yield = LoopYieldManagedAsync(interface, 420 self.time_atom_by_priority[default_priority], 421 default_priority, 422 type(self)) 423 else: 424 raise NotImplementedError 425 426 loop_yield.time_atom = self.time_atom_by_priority[loop_yield.priority] 427 self.all_yield_objects[interface.coro_id] = loop_yield 428 self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler) 429 self.yields_by_priority[loop_yield.priority] += 1 430 self.make_live() 431 432 return True, loop_yield, None 433 434 def _on_setup(self, max_delay: float): 435 self.max_delay = max_delay 436 self.compute_delays() 437 # self.compute_time_atoms() 438 self.make_live() 439 return True, None, None 440 441 def _on_change_priority(self, new_priority: CoroPriority, old_priority: CoroPriority): 442 self.yields_by_priority[old_priority] -= 1 443 self.yields_by_priority[new_priority] += 1 444 loop_yield = self.all_yield_objects[self.current_caller_coro_info.coro_id] 445 loop_yield.time_atom = self.time_atom_by_priority[new_priority] 446 self.make_live() 447 return True, None, None 448 449 def _on_get(self): 450 return True, self.all_yield_objects.get(self.current_caller_coro_info.coro_id), None 451 452 def get_yield_object(self, coro_id: CoroID) -> LoopYieldManagedBase: 453 return self.all_yield_objects.get(coro_id) 454 455 def _on_register_external(self, asyncio_loop: asyncio.AbstractEventLoop, default_priority: CoroPriority): 456 task_id = -(self.task_counter.get()) 457 loop_yield = LoopYieldManagedAsyncExternal(task_id, 458 self.time_atom_by_priority[default_priority], 459 default_priority, 460 type(self), 461 self._loop, 462 asyncio_loop) 463 loop_yield.time_atom = self.time_atom_by_priority[loop_yield.priority] 464 self.all_yield_objects[task_id] = loop_yield 465 self.yields_by_priority[loop_yield.priority] += 1 466 self.make_live() 467 return True, loop_yield, None 468 469 def _on_register_external_asyncio_task(self, asyncio_loop: asyncio.AbstractEventLoop, task: asyncio.Task, default_priority: CoroPriority): 470 asyncio_task_id = id(task) 471 if asyncio_task_id not in self.asyncio_task_ids: 472 self.asyncio_task_ids[asyncio_task_id] = -(self.task_counter.get()) 473 474 task_id = self.asyncio_task_ids[asyncio_task_id] 475 if task_id in self.all_yield_objects: 476 loop_yield = self.all_yield_objects[task_id] 477 else: 478 loop_yield = LoopYieldManagedAsyncExternal(task_id, 479 self.time_atom_by_priority[default_priority], 480 default_priority, 481 type(self), 482 self._loop, 483 asyncio_loop) 484 def on_done_asyncio_coro(future): 485 self._on_del_external(loop_yield) 486 task.remove_done_callback(loop_yield.on_done_asyncio_coro) 487 488 loop_yield.asyncio_task = task 489 loop_yield.on_done_asyncio_coro = on_done_asyncio_coro 490 task.add_done_callback(loop_yield.on_done_asyncio_coro) 491 loop_yield.time_atom = self.time_atom_by_priority[loop_yield.priority] 492 self.all_yield_objects[task_id] = loop_yield 493 self.yields_by_priority[loop_yield.priority] += 1 494 self.make_live() 495 496 return True, loop_yield, None 497 498 def _on_change_priority_external(self, loop_yield: LoopYieldManagedAsyncExternal, new_priority: CoroPriority, old_priority: CoroPriority): 499 self.yields_by_priority[old_priority] -= 1 500 self.yields_by_priority[new_priority] += 1 501 loop_yield.time_atom = self.time_atom_by_priority[new_priority] 502 self.make_live() 503 return True, None, None 504 505 def _on_del_external(self, loop_yield: LoopYieldManagedAsyncExternal): 506 task_id = loop_yield.task_id 507 if task_id in self.all_yield_objects: 508 priority = self.all_yield_objects[task_id].priority 509 del self.all_yield_objects[task_id] 510 self.yields_by_priority[priority] -= 1 511 self.make_live() 512 return True, None, None 513 514 def _request_coro_kill(self, coro_id: CoroID) -> ServiceProcessingResponse: 515 self.coroutines_requested_to_be_deleted.add(coro_id) 516 return True, None, None 517 518 def _kill_coro(self, coro_id: CoroID) -> ServiceProcessingResponse: 519 waiter_coro_id = self.current_caller_coro_info 520 if coro_id not in self.coroutines_requested_to_be_deleted_by_waiters: 521 self.coroutines_requested_to_be_deleted_by_waiters[coro_id] = set() 522 523 self.coroutines_requested_to_be_deleted_by_waiters[coro_id].add(waiter_coro_id) 524 return False, None, None 525 526 def _on_coro_del_handler_global(self, coro: CoroWrapperBase) -> bool: 527 coro_id = coro.coro_id 528 if coro_id in self.all_yield_objects: 529 priority = self.all_yield_objects[coro_id].priority 530 del self.all_yield_objects[coro_id] 531 self.yields_by_priority[priority] -= 1 532 self.make_live() 533 return False 534 535 def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool: 536 coro_id = coro.coro_id 537 priority = self.all_yield_objects[coro_id].priority 538 del self.all_yield_objects[coro_id] 539 self.yields_by_priority[priority] -= 1 540 self.make_live() 541 return False 542 543 544LoopYieldPrioritySchedulerRequest.default_service_type = LoopYieldPriorityScheduler 545 546 547def get_loop_yield(default_priority: CoroPriority = CoroPriority.normal) -> Union[LoopYieldManaged, FakeLoopYieldManaged]: 548 loop = CoroScheduler.current_loop() 549 if loop is None: 550 return FakeLoopYieldManaged() # running not from inside the loop 551 552 interface = loop.current_interface() 553 if interface is None: 554 return FakeLoopYieldManaged() # running from Service 555 556 # ly = interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().get()) 557 ly = interface._loop.get_service_instance(LoopYieldPriorityScheduler).get_yield_object(interface.coro_id) 558 if ly is None: 559 ly = interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().register(default_priority)) 560 561 return ly 562 563 564gly = get_loop_yield 565 566 567async def aget_loop_yield(default_priority: CoroPriority = CoroPriority.normal) -> Union[LoopYieldManagedAsync, FakeLoopYieldManagedAsync]: 568 loop = CoroScheduler.current_loop() 569 if loop is None: 570 return FakeLoopYieldManagedAsync() # running not from inside the loop 571 572 interface = loop.current_interface() 573 if interface is None: 574 return FakeLoopYieldManagedAsync() # running from Service 575 576 # ly = await interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().get()) 577 ly = interface._loop.get_service_instance(LoopYieldPriorityScheduler).get_yield_object(interface.coro_id) 578 if ly is None: 579 ly = await interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().register(default_priority)) 580 581 return ly 582 583 584agly = aget_loop_yield 585 586 587@asynccontextmanager 588@async_generator 589async def external_aget_loop_yield(default_priority: CoroPriority = CoroPriority.normal, coro_scheduler: Optional[CoroSchedulerType]=None, asyncio_loop: Optional[asyncio.AbstractEventLoop]=None): 590 if coro_scheduler is None: 591 coro_scheduler = CoroScheduler.current_loop() 592 593 if coro_scheduler is None: 594 await yield_(FakeLoopYieldManagedAsync()) # can not determine coro scheduler loop 595 else: 596 if asyncio_loop is None: 597 asyncio_loop = asyncio.get_event_loop() 598 599 if (3, 7) <= sys.version_info: 600 ly: LoopYieldManagedAsyncExternal = await await_task_fast( 601 asyncio_loop, coro_scheduler, LoopYieldPriorityScheduler, 602 LoopYieldPrioritySchedulerRequest().register_external_asyncio_task(asyncio_loop, asyncio.current_task(loop=asyncio_loop), default_priority)) 603 else: 604 ly: LoopYieldManagedAsyncExternal = await await_task_fast( 605 asyncio_loop, coro_scheduler, LoopYieldPriorityScheduler, 606 LoopYieldPrioritySchedulerRequest().register_external(asyncio_loop, default_priority)) 607 608 try: 609 await yield_(ly) 610 finally: 611 if (3, 7) > sys.version_info: 612 await await_task_fast(asyncio_loop, coro_scheduler, LoopYieldPriorityScheduler, 613 LoopYieldPrioritySchedulerRequest().del_external(ly)) 614 615eagly = external_aget_loop_yield
An enumeration.
Inherited Members
- enum.Enum
- name
- value
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
65class LoopYieldPrioritySchedulerRequest(ServiceRequest): 66 def register(self, default_priority: CoroPriority) -> 'LoopYieldManagedBase': 67 return self._save(0, default_priority) 68 69 def setup(self, max_delay: float) -> None: 70 return self._save(1, max_delay) 71 72 def change_priority(self, new_priority: CoroPriority, old_priority: CoroPriority) -> None: 73 return self._save(2, new_priority, old_priority) 74 75 def get(self) -> 'LoopYieldManagedBase': 76 return self._save(3) 77 78 def register_external(self, asyncio_loop: asyncio.AbstractEventLoop, default_priority: CoroPriority) -> 'LoopYieldManagedAsyncExternal': 79 return self._save(4, asyncio_loop, default_priority) 80 81 def register_external_asyncio_task(self, asyncio_loop: asyncio.AbstractEventLoop, task: asyncio.Task, default_priority: CoroPriority) -> 'LoopYieldManagedAsyncExternal': 82 return self._save(5, asyncio_loop, task, default_priority) 83 84 def change_priority_external(self, loop_yield: 'LoopYieldManagedAsyncExternal', new_priority: CoroPriority, old_priority: CoroPriority) -> None: 85 return self._save(6, loop_yield, new_priority, old_priority) 86 87 def del_external(self, loop_yield: 'LoopYieldManagedAsyncExternal') -> None: 88 return self._save(7, loop_yield) 89 90 def request_coro_kill(self, coro_id: CoroID) -> None: 91 """Make request for a coro kill asynchronously (immidiately returns). An exception ThisCoroWasRequestedToBeKilled will be emmited in coro with a requested coro_id during one of it's next requests to LoopYield service 92 93 Args: 94 coro_id (CoroID): _description_ 95 96 Returns: 97 ServiceRequest: _description_ 98 """ 99 return self._save(8, coro_id) 100 101 def kill_coro(self, coro_id: CoroID) -> None: 102 """Make request for a coro kill synchronously (waits for next steps). An exception ThisCoroWasRequestedToBeKilled will be emmited in coro with a requested coro_id during one of it's next requests to LoopYield service. After an exception was sent to the requested coro_id, current coro will receive an answer from the service (None). 103 104 Args: 105 coro_id (CoroID): _description_ 106 107 Returns: 108 ServiceRequest: _description_ 109 """ 110 return self._save(9, coro_id)
90 def request_coro_kill(self, coro_id: CoroID) -> None: 91 """Make request for a coro kill asynchronously (immidiately returns). An exception ThisCoroWasRequestedToBeKilled will be emmited in coro with a requested coro_id during one of it's next requests to LoopYield service 92 93 Args: 94 coro_id (CoroID): _description_ 95 96 Returns: 97 ServiceRequest: _description_ 98 """ 99 return self._save(8, coro_id)
Make request for a coro kill asynchronously (immidiately returns). An exception ThisCoroWasRequestedToBeKilled will be emmited in coro with a requested coro_id during one of it's next requests to LoopYield service
Args: coro_id (CoroID): _description_
Returns: ServiceRequest: _description_
101 def kill_coro(self, coro_id: CoroID) -> None: 102 """Make request for a coro kill synchronously (waits for next steps). An exception ThisCoroWasRequestedToBeKilled will be emmited in coro with a requested coro_id during one of it's next requests to LoopYield service. After an exception was sent to the requested coro_id, current coro will receive an answer from the service (None). 103 104 Args: 105 coro_id (CoroID): _description_ 106 107 Returns: 108 ServiceRequest: _description_ 109 """ 110 return self._save(9, coro_id)
Make request for a coro kill synchronously (waits for next steps). An exception ThisCoroWasRequestedToBeKilled will be emmited in coro with a requested coro_id during one of it's next requests to LoopYield service. After an exception was sent to the requested coro_id, current coro will receive an answer from the service (None).
Args: coro_id (CoroID): _description_
Returns: ServiceRequest: _description_
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
- default__request__type__
- request_type
- args
- kwargs
- provide_to_request_handler
- interface
- i
- async_interface
- ai
113class LoopYieldManagedBase: 114 def __init__(self, interface: Interface, time_atom: ValueExistence, 115 default_priority: CoroPriority, service: Type[Service]): 116 self._interface = None 117 self.interface = interface 118 self.time_atom = time_atom 119 self.default_priority = default_priority 120 self.priority = self.default_priority 121 self.service = service 122 self.tracer = None 123 124 @property 125 def interface(self): 126 if self._interface is None: 127 self._interface = current_interface() 128 129 return self._interface 130 131 @interface.setter 132 def interface(self, value): 133 self._interface = value
114 def __init__(self, interface: Interface, time_atom: ValueExistence, 115 default_priority: CoroPriority, service: Type[Service]): 116 self._interface = None 117 self.interface = interface 118 self.time_atom = time_atom 119 self.default_priority = default_priority 120 self.priority = self.default_priority 121 self.service = service 122 self.tracer = None
136class LoopYieldManaged(LoopYieldManagedBase): 137 def __init__(self, interface: Interface, time_atom: ValueExistence, 138 default_priority: CoroPriority, service: Type[Service]): 139 super(LoopYieldManaged, self).__init__(interface, time_atom, default_priority, service) 140 141 def __call__(self, priority: Optional[CoroPriority] = None): 142 if priority is None: 143 priority = self.default_priority 144 145 if priority != self.priority: 146 self.interface = None 147 self.interface(self.service, LoopYieldPrioritySchedulerRequest().change_priority(priority, 148 self.priority)) 149 self.priority = priority 150 try: 151 self.tracer = Tracer(self.time_atom.result) 152 except TimeLimitIsTooSmall as ex: 153 self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME) 154 155 if self.tracer is None: 156 exception = None 157 try: 158 self.tracer = Tracer(self.time_atom.result) 159 except TimeLimitIsTooSmall as ex: 160 exception = ex 161 162 if exception is not None: 163 try: 164 self.tracer = Tracer(exception.min_time if exception.min_time is not None else MIN_TIME) 165 except TimeLimitIsTooSmall as ex: 166 print(ex) 167 168 if self.tracer is not None: 169 if not self.tracer.iter(): 170 self.tracer = None 171 self.interface = None 172 self.interface(self.service)
Inherited Members
175class LoopYieldManagedAsync(LoopYieldManagedBase): 176 def __init__(self, interface: Interface, time_atom: ValueExistence, 177 default_priority: CoroPriority, service: Type[Service]): 178 super(LoopYieldManagedAsync, self).__init__(interface, time_atom, default_priority, service) 179 180 async def __call__(self, priority: Optional[CoroPriority] = None): 181 if priority is None: 182 priority = self.default_priority 183 if priority != self.priority: 184 self.interface = None 185 await self.interface(self.service, LoopYieldPrioritySchedulerRequest().change_priority(priority, 186 self.priority)) 187 self.priority = priority 188 try: 189 self.tracer = Tracer(self.time_atom.result) 190 except TimeLimitIsTooSmall as ex: 191 self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME) 192 if self.tracer is None: 193 try: 194 self.tracer = Tracer(self.time_atom.result) 195 except TimeLimitIsTooSmall as ex: 196 self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME) 197 if not self.tracer.iter(): 198 self.tracer = None 199 self.interface = None 200 await self.interface(self.service)
Inherited Members
247class LoopYieldPriorityScheduler(TypedService[None], EntityStatsMixin): 248 def __init__(self, loop: CoroSchedulerType): 249 super(LoopYieldPriorityScheduler, self).__init__(loop) 250 251 # loop.add_global_on_coro_del_handler(self._on_coro_del_handler_global) # Todo: switch to local coro del handler 252 253 self._request_workers = { 254 0: self._on_register, 255 1: self._on_setup, 256 2: self._on_change_priority, 257 3: self._on_get, 258 4: self._on_register_external, 259 5: self._on_register_external_asyncio_task, 260 6: self._on_change_priority_external, 261 7: self._on_del_external, 262 8: self._request_coro_kill, 263 9: self._kill_coro, 264 } 265 266 self.sigma = { 267 0: 0.6827, 268 1: 0.9545 - 0.6827, 269 2: 1.0 - 0.9545, 270 } 271 272 self.max_delay = 0.01 273 self.max_delays = { 274 0: 0.0, 275 1: 0.0, 276 2: 0.0, 277 } 278 self.compute_delays() 279 280 self.all_yield_objects = dict() # type: Dict[CoroID, LoopYieldManagedBase] 281 self.task_counter = Counter() 282 while self.task_counter.get() <= 0: 283 pass 284 285 self.yields_num: int = 0 286 self.coroutines_requested_to_be_deleted: Set[CoroID] = set() 287 self.coroutines_requested_to_be_deleted_by_waiters: Dict[CoroID, Set[CoroID]] = dict() 288 self.finished_waiters_for_coro_kill: Set[CoroID] = set() 289 self.asyncio_task_ids: Dict[Hashable, int] = dict() 290 291 self.yields_by_priority: Dict[CoroPriority, int] = { 292 CoroPriority.high: 0, 293 CoroPriority.normal: 0, 294 CoroPriority.low: 0, 295 } 296 297 self.time_atom_by_priority = { 298 CoroPriority.high: ValueExistence(True, self.max_delays[0]), 299 CoroPriority.normal: ValueExistence(True, self.max_delays[1]), 300 CoroPriority.low: ValueExistence(True, self.max_delays[2]), 301 } 302 303 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 304 coroutines_requested_to_be_killed = self.coroutines_requested_to_be_deleted | set(self.coroutines_requested_to_be_deleted_by_waiters.keys()) 305 return type(self).__name__, { 306 'task counter': self.task_counter._index, 307 'yields num': self.yields_num, 308 'max_delay': self.max_delay, 309 'max_delays': self.max_delays, 310 'affected coroutines num': len(self.all_yield_objects), 311 'coroutines num by priority': { 312 'high': self.yields_by_priority[CoroPriority.high], 313 'normal': self.yields_by_priority[CoroPriority.normal], 314 'low': self.yields_by_priority[CoroPriority.low], 315 }, 316 'time atoms by priority': { 317 'high': self.time_atom_by_priority[CoroPriority.high].result, 318 'normal': self.time_atom_by_priority[CoroPriority.normal].result, 319 'low': self.time_atom_by_priority[CoroPriority.low].result, 320 }, 321 'coroutines requested to be killed': { 322 'num': len(coroutines_requested_to_be_killed), 323 'list': coroutines_requested_to_be_killed, 324 } 325 } 326 327 def single_task_registration_or_immediate_processing(self, request: Optional[LoopYieldPrioritySchedulerRequest]=None 328 ) -> Tuple[bool, Any, None]: 329 self.yields_num += 1 330 coro_should_be_killed = False 331 coro_id = self.current_caller_coro_info.coro_id 332 if coro_id in self.coroutines_requested_to_be_deleted: 333 coro_should_be_killed = True 334 335 if coro_id in self.coroutines_requested_to_be_deleted_by_waiters: 336 self.finished_waiters_for_coro_kill |= self.coroutines_requested_to_be_deleted_by_waiters[coro_id] 337 self.make_live() 338 coro_should_be_killed = True 339 340 if request is not None: 341 if coro_should_be_killed and (self._on_del_external != self._request_workers[request.request_type]): 342 return True, None, ThisCoroWasRequestedToBeKilled 343 344 return self.resolve_request(request) 345 346 if coro_should_be_killed: 347 return True, None, ThisCoroWasRequestedToBeKilled 348 349 return True, None, None 350 351 def full_processing_iteration(self): 352 for coro_id in self.finished_waiters_for_coro_kill: 353 self.register_response(coro_id, None) 354 355 self.compute_time_atoms() 356 357 def compute_time_atoms(self): 358 if self.all_yield_objects: 359 top_sigma = 0 360 361 if self.yields_by_priority[CoroPriority.high]: 362 top_sigma += 1 363 364 if self.yields_by_priority[CoroPriority.normal]: 365 top_sigma += 1 366 367 if self.yields_by_priority[CoroPriority.low]: 368 top_sigma += 1 369 370 median_time_atom = 1 / len(self.all_yield_objects) # !!! Possible division by zero! Conditional must not be removed! 371 if 1 == top_sigma: 372 sigma_0_time_atom = min(median_time_atom, self.max_delays[0]) 373 self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom 374 self.time_atom_by_priority[CoroPriority.normal].result = sigma_0_time_atom 375 self.time_atom_by_priority[CoroPriority.low].result = sigma_0_time_atom 376 elif 2 == top_sigma: 377 sigma_0_time_atom = min(median_time_atom * self.sigma[0], self.max_delays[0]) 378 sigma_1_time_atom = min(median_time_atom * self.sigma[1], self.max_delays[1]) 379 if self.yields_by_priority[CoroPriority.high]: 380 self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom 381 self.time_atom_by_priority[CoroPriority.normal].result = sigma_1_time_atom 382 self.time_atom_by_priority[CoroPriority.low].result = sigma_1_time_atom 383 else: 384 self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom 385 self.time_atom_by_priority[CoroPriority.normal].result = sigma_0_time_atom 386 self.time_atom_by_priority[CoroPriority.low].result = sigma_1_time_atom 387 elif 3 == top_sigma: 388 sigma_0_time_atom = min(median_time_atom * self.sigma[0], self.max_delays[0]) 389 sigma_1_time_atom = min(median_time_atom * self.sigma[1], self.max_delays[1]) 390 sigma_2_time_atom = min(median_time_atom * self.sigma[2], self.max_delays[2]) 391 self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom 392 self.time_atom_by_priority[CoroPriority.normal].result = sigma_1_time_atom 393 self.time_atom_by_priority[CoroPriority.low].result = sigma_2_time_atom 394 395 self.make_dead() 396 397 def in_work(self) -> bool: 398 result: bool = bool(self.finished_waiters_for_coro_kill) or bool(self.all_yield_objects) 399 return self.thrifty_in_work(result) 400 401 def compute_delays(self): 402 self.max_delays = { 403 0: self.max_delay * self.sigma[0], 404 1: self.max_delay * self.sigma[1], 405 2: self.max_delay * self.sigma[2], 406 } 407 408 def _on_register(self, default_priority: CoroPriority): 409 task_id = self.current_caller_coro_info.coro_id 410 if task_id in self.all_yield_objects: 411 loop_yield = self.all_yield_objects[task_id] 412 else: 413 interface: Interface = self.current_caller_coro_info.coro.interface 414 if isinstance(interface, InterfaceGreenlet): 415 loop_yield = LoopYieldManaged(interface, 416 self.time_atom_by_priority[default_priority], 417 default_priority, 418 type(self)) 419 elif isinstance(interface, InterfaceAsyncAwait): 420 loop_yield = LoopYieldManagedAsync(interface, 421 self.time_atom_by_priority[default_priority], 422 default_priority, 423 type(self)) 424 else: 425 raise NotImplementedError 426 427 loop_yield.time_atom = self.time_atom_by_priority[loop_yield.priority] 428 self.all_yield_objects[interface.coro_id] = loop_yield 429 self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler) 430 self.yields_by_priority[loop_yield.priority] += 1 431 self.make_live() 432 433 return True, loop_yield, None 434 435 def _on_setup(self, max_delay: float): 436 self.max_delay = max_delay 437 self.compute_delays() 438 # self.compute_time_atoms() 439 self.make_live() 440 return True, None, None 441 442 def _on_change_priority(self, new_priority: CoroPriority, old_priority: CoroPriority): 443 self.yields_by_priority[old_priority] -= 1 444 self.yields_by_priority[new_priority] += 1 445 loop_yield = self.all_yield_objects[self.current_caller_coro_info.coro_id] 446 loop_yield.time_atom = self.time_atom_by_priority[new_priority] 447 self.make_live() 448 return True, None, None 449 450 def _on_get(self): 451 return True, self.all_yield_objects.get(self.current_caller_coro_info.coro_id), None 452 453 def get_yield_object(self, coro_id: CoroID) -> LoopYieldManagedBase: 454 return self.all_yield_objects.get(coro_id) 455 456 def _on_register_external(self, asyncio_loop: asyncio.AbstractEventLoop, default_priority: CoroPriority): 457 task_id = -(self.task_counter.get()) 458 loop_yield = LoopYieldManagedAsyncExternal(task_id, 459 self.time_atom_by_priority[default_priority], 460 default_priority, 461 type(self), 462 self._loop, 463 asyncio_loop) 464 loop_yield.time_atom = self.time_atom_by_priority[loop_yield.priority] 465 self.all_yield_objects[task_id] = loop_yield 466 self.yields_by_priority[loop_yield.priority] += 1 467 self.make_live() 468 return True, loop_yield, None 469 470 def _on_register_external_asyncio_task(self, asyncio_loop: asyncio.AbstractEventLoop, task: asyncio.Task, default_priority: CoroPriority): 471 asyncio_task_id = id(task) 472 if asyncio_task_id not in self.asyncio_task_ids: 473 self.asyncio_task_ids[asyncio_task_id] = -(self.task_counter.get()) 474 475 task_id = self.asyncio_task_ids[asyncio_task_id] 476 if task_id in self.all_yield_objects: 477 loop_yield = self.all_yield_objects[task_id] 478 else: 479 loop_yield = LoopYieldManagedAsyncExternal(task_id, 480 self.time_atom_by_priority[default_priority], 481 default_priority, 482 type(self), 483 self._loop, 484 asyncio_loop) 485 def on_done_asyncio_coro(future): 486 self._on_del_external(loop_yield) 487 task.remove_done_callback(loop_yield.on_done_asyncio_coro) 488 489 loop_yield.asyncio_task = task 490 loop_yield.on_done_asyncio_coro = on_done_asyncio_coro 491 task.add_done_callback(loop_yield.on_done_asyncio_coro) 492 loop_yield.time_atom = self.time_atom_by_priority[loop_yield.priority] 493 self.all_yield_objects[task_id] = loop_yield 494 self.yields_by_priority[loop_yield.priority] += 1 495 self.make_live() 496 497 return True, loop_yield, None 498 499 def _on_change_priority_external(self, loop_yield: LoopYieldManagedAsyncExternal, new_priority: CoroPriority, old_priority: CoroPriority): 500 self.yields_by_priority[old_priority] -= 1 501 self.yields_by_priority[new_priority] += 1 502 loop_yield.time_atom = self.time_atom_by_priority[new_priority] 503 self.make_live() 504 return True, None, None 505 506 def _on_del_external(self, loop_yield: LoopYieldManagedAsyncExternal): 507 task_id = loop_yield.task_id 508 if task_id in self.all_yield_objects: 509 priority = self.all_yield_objects[task_id].priority 510 del self.all_yield_objects[task_id] 511 self.yields_by_priority[priority] -= 1 512 self.make_live() 513 return True, None, None 514 515 def _request_coro_kill(self, coro_id: CoroID) -> ServiceProcessingResponse: 516 self.coroutines_requested_to_be_deleted.add(coro_id) 517 return True, None, None 518 519 def _kill_coro(self, coro_id: CoroID) -> ServiceProcessingResponse: 520 waiter_coro_id = self.current_caller_coro_info 521 if coro_id not in self.coroutines_requested_to_be_deleted_by_waiters: 522 self.coroutines_requested_to_be_deleted_by_waiters[coro_id] = set() 523 524 self.coroutines_requested_to_be_deleted_by_waiters[coro_id].add(waiter_coro_id) 525 return False, None, None 526 527 def _on_coro_del_handler_global(self, coro: CoroWrapperBase) -> bool: 528 coro_id = coro.coro_id 529 if coro_id in self.all_yield_objects: 530 priority = self.all_yield_objects[coro_id].priority 531 del self.all_yield_objects[coro_id] 532 self.yields_by_priority[priority] -= 1 533 self.make_live() 534 return False 535 536 def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool: 537 coro_id = coro.coro_id 538 priority = self.all_yield_objects[coro_id].priority 539 del self.all_yield_objects[coro_id] 540 self.yields_by_priority[priority] -= 1 541 self.make_live() 542 return False
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
248 def __init__(self, loop: CoroSchedulerType): 249 super(LoopYieldPriorityScheduler, self).__init__(loop) 250 251 # loop.add_global_on_coro_del_handler(self._on_coro_del_handler_global) # Todo: switch to local coro del handler 252 253 self._request_workers = { 254 0: self._on_register, 255 1: self._on_setup, 256 2: self._on_change_priority, 257 3: self._on_get, 258 4: self._on_register_external, 259 5: self._on_register_external_asyncio_task, 260 6: self._on_change_priority_external, 261 7: self._on_del_external, 262 8: self._request_coro_kill, 263 9: self._kill_coro, 264 } 265 266 self.sigma = { 267 0: 0.6827, 268 1: 0.9545 - 0.6827, 269 2: 1.0 - 0.9545, 270 } 271 272 self.max_delay = 0.01 273 self.max_delays = { 274 0: 0.0, 275 1: 0.0, 276 2: 0.0, 277 } 278 self.compute_delays() 279 280 self.all_yield_objects = dict() # type: Dict[CoroID, LoopYieldManagedBase] 281 self.task_counter = Counter() 282 while self.task_counter.get() <= 0: 283 pass 284 285 self.yields_num: int = 0 286 self.coroutines_requested_to_be_deleted: Set[CoroID] = set() 287 self.coroutines_requested_to_be_deleted_by_waiters: Dict[CoroID, Set[CoroID]] = dict() 288 self.finished_waiters_for_coro_kill: Set[CoroID] = set() 289 self.asyncio_task_ids: Dict[Hashable, int] = dict() 290 291 self.yields_by_priority: Dict[CoroPriority, int] = { 292 CoroPriority.high: 0, 293 CoroPriority.normal: 0, 294 CoroPriority.low: 0, 295 } 296 297 self.time_atom_by_priority = { 298 CoroPriority.high: ValueExistence(True, self.max_delays[0]), 299 CoroPriority.normal: ValueExistence(True, self.max_delays[1]), 300 CoroPriority.low: ValueExistence(True, self.max_delays[2]), 301 }
327 def single_task_registration_or_immediate_processing(self, request: Optional[LoopYieldPrioritySchedulerRequest]=None 328 ) -> Tuple[bool, Any, None]: 329 self.yields_num += 1 330 coro_should_be_killed = False 331 coro_id = self.current_caller_coro_info.coro_id 332 if coro_id in self.coroutines_requested_to_be_deleted: 333 coro_should_be_killed = True 334 335 if coro_id in self.coroutines_requested_to_be_deleted_by_waiters: 336 self.finished_waiters_for_coro_kill |= self.coroutines_requested_to_be_deleted_by_waiters[coro_id] 337 self.make_live() 338 coro_should_be_killed = True 339 340 if request is not None: 341 if coro_should_be_killed and (self._on_del_external != self._request_workers[request.request_type]): 342 return True, None, ThisCoroWasRequestedToBeKilled 343 344 return self.resolve_request(request) 345 346 if coro_should_be_killed: 347 return True, None, ThisCoroWasRequestedToBeKilled 348 349 return True, None, None
357 def compute_time_atoms(self): 358 if self.all_yield_objects: 359 top_sigma = 0 360 361 if self.yields_by_priority[CoroPriority.high]: 362 top_sigma += 1 363 364 if self.yields_by_priority[CoroPriority.normal]: 365 top_sigma += 1 366 367 if self.yields_by_priority[CoroPriority.low]: 368 top_sigma += 1 369 370 median_time_atom = 1 / len(self.all_yield_objects) # !!! Possible division by zero! Conditional must not be removed! 371 if 1 == top_sigma: 372 sigma_0_time_atom = min(median_time_atom, self.max_delays[0]) 373 self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom 374 self.time_atom_by_priority[CoroPriority.normal].result = sigma_0_time_atom 375 self.time_atom_by_priority[CoroPriority.low].result = sigma_0_time_atom 376 elif 2 == top_sigma: 377 sigma_0_time_atom = min(median_time_atom * self.sigma[0], self.max_delays[0]) 378 sigma_1_time_atom = min(median_time_atom * self.sigma[1], self.max_delays[1]) 379 if self.yields_by_priority[CoroPriority.high]: 380 self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom 381 self.time_atom_by_priority[CoroPriority.normal].result = sigma_1_time_atom 382 self.time_atom_by_priority[CoroPriority.low].result = sigma_1_time_atom 383 else: 384 self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom 385 self.time_atom_by_priority[CoroPriority.normal].result = sigma_0_time_atom 386 self.time_atom_by_priority[CoroPriority.low].result = sigma_1_time_atom 387 elif 3 == top_sigma: 388 sigma_0_time_atom = min(median_time_atom * self.sigma[0], self.max_delays[0]) 389 sigma_1_time_atom = min(median_time_atom * self.sigma[1], self.max_delays[1]) 390 sigma_2_time_atom = min(median_time_atom * self.sigma[2], self.max_delays[2]) 391 self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom 392 self.time_atom_by_priority[CoroPriority.normal].result = sigma_1_time_atom 393 self.time_atom_by_priority[CoroPriority.low].result = sigma_2_time_atom 394 395 self.make_dead()
397 def in_work(self) -> bool: 398 result: bool = bool(self.finished_waiters_for_coro_kill) or bool(self.all_yield_objects) 399 return self.thrifty_in_work(result)
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
- StatsLevel
- get_entity_stats
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- time_left_before_next_event
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
- destroy
548def get_loop_yield(default_priority: CoroPriority = CoroPriority.normal) -> Union[LoopYieldManaged, FakeLoopYieldManaged]: 549 loop = CoroScheduler.current_loop() 550 if loop is None: 551 return FakeLoopYieldManaged() # running not from inside the loop 552 553 interface = loop.current_interface() 554 if interface is None: 555 return FakeLoopYieldManaged() # running from Service 556 557 # ly = interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().get()) 558 ly = interface._loop.get_service_instance(LoopYieldPriorityScheduler).get_yield_object(interface.coro_id) 559 if ly is None: 560 ly = interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().register(default_priority)) 561 562 return ly
548def get_loop_yield(default_priority: CoroPriority = CoroPriority.normal) -> Union[LoopYieldManaged, FakeLoopYieldManaged]: 549 loop = CoroScheduler.current_loop() 550 if loop is None: 551 return FakeLoopYieldManaged() # running not from inside the loop 552 553 interface = loop.current_interface() 554 if interface is None: 555 return FakeLoopYieldManaged() # running from Service 556 557 # ly = interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().get()) 558 ly = interface._loop.get_service_instance(LoopYieldPriorityScheduler).get_yield_object(interface.coro_id) 559 if ly is None: 560 ly = interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().register(default_priority)) 561 562 return ly
568async def aget_loop_yield(default_priority: CoroPriority = CoroPriority.normal) -> Union[LoopYieldManagedAsync, FakeLoopYieldManagedAsync]: 569 loop = CoroScheduler.current_loop() 570 if loop is None: 571 return FakeLoopYieldManagedAsync() # running not from inside the loop 572 573 interface = loop.current_interface() 574 if interface is None: 575 return FakeLoopYieldManagedAsync() # running from Service 576 577 # ly = await interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().get()) 578 ly = interface._loop.get_service_instance(LoopYieldPriorityScheduler).get_yield_object(interface.coro_id) 579 if ly is None: 580 ly = await interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().register(default_priority)) 581 582 return ly
568async def aget_loop_yield(default_priority: CoroPriority = CoroPriority.normal) -> Union[LoopYieldManagedAsync, FakeLoopYieldManagedAsync]: 569 loop = CoroScheduler.current_loop() 570 if loop is None: 571 return FakeLoopYieldManagedAsync() # running not from inside the loop 572 573 interface = loop.current_interface() 574 if interface is None: 575 return FakeLoopYieldManagedAsync() # running from Service 576 577 # ly = await interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().get()) 578 ly = interface._loop.get_service_instance(LoopYieldPriorityScheduler).get_yield_object(interface.coro_id) 579 if ly is None: 580 ly = await interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().register(default_priority)) 581 582 return ly
203class LoopYieldManagedAsyncExternal(LoopYieldManagedBase): 204 def __init__(self, task_id: int, time_atom: ValueExistence, 205 default_priority: CoroPriority, service: Type[Service], coro_scheduler: CoroSchedulerType, asyncio_loop: asyncio.AbstractEventLoop): 206 super(LoopYieldManagedAsyncExternal, self).__init__(None, time_atom, default_priority, service) 207 self.task_id = task_id 208 self.coro_scheduler = coro_scheduler 209 self.asyncio_loop = asyncio_loop 210 self.asyncio_task: asyncio.Task = None 211 self.on_done_asyncio_coro: Callable = None 212 213 async def __call__(self, priority: Optional[CoroPriority] = None): 214 if priority is None: 215 priority = self.default_priority 216 if priority != self.priority: 217 await await_task_fast(self.asyncio_loop, self.coro_scheduler, self.service, 218 LoopYieldPrioritySchedulerRequest().change_priority_external(self, priority, self.priority)) 219 self.priority = priority 220 try: 221 self.tracer = Tracer(self.time_atom.result) 222 except TimeLimitIsTooSmall as ex: 223 self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME) 224 if self.tracer is None: 225 try: 226 self.tracer = Tracer(self.time_atom.result) 227 except TimeLimitIsTooSmall as ex: 228 self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME) 229 if not self.tracer.iter(): 230 self.tracer = None 231 await await_task_fast(self.asyncio_loop, self.coro_scheduler, self.service)
204 def __init__(self, task_id: int, time_atom: ValueExistence, 205 default_priority: CoroPriority, service: Type[Service], coro_scheduler: CoroSchedulerType, asyncio_loop: asyncio.AbstractEventLoop): 206 super(LoopYieldManagedAsyncExternal, self).__init__(None, time_atom, default_priority, service) 207 self.task_id = task_id 208 self.coro_scheduler = coro_scheduler 209 self.asyncio_loop = asyncio_loop 210 self.asyncio_task: asyncio.Task = None 211 self.on_done_asyncio_coro: Callable = None
Inherited Members
588@asynccontextmanager 589@async_generator 590async def external_aget_loop_yield(default_priority: CoroPriority = CoroPriority.normal, coro_scheduler: Optional[CoroSchedulerType]=None, asyncio_loop: Optional[asyncio.AbstractEventLoop]=None): 591 if coro_scheduler is None: 592 coro_scheduler = CoroScheduler.current_loop() 593 594 if coro_scheduler is None: 595 await yield_(FakeLoopYieldManagedAsync()) # can not determine coro scheduler loop 596 else: 597 if asyncio_loop is None: 598 asyncio_loop = asyncio.get_event_loop() 599 600 if (3, 7) <= sys.version_info: 601 ly: LoopYieldManagedAsyncExternal = await await_task_fast( 602 asyncio_loop, coro_scheduler, LoopYieldPriorityScheduler, 603 LoopYieldPrioritySchedulerRequest().register_external_asyncio_task(asyncio_loop, asyncio.current_task(loop=asyncio_loop), default_priority)) 604 else: 605 ly: LoopYieldManagedAsyncExternal = await await_task_fast( 606 asyncio_loop, coro_scheduler, LoopYieldPriorityScheduler, 607 LoopYieldPrioritySchedulerRequest().register_external(asyncio_loop, default_priority)) 608 609 try: 610 await yield_(ly) 611 finally: 612 if (3, 7) > sys.version_info: 613 await await_task_fast(asyncio_loop, coro_scheduler, LoopYieldPriorityScheduler, 614 LoopYieldPrioritySchedulerRequest().del_external(ly))
588@asynccontextmanager 589@async_generator 590async def external_aget_loop_yield(default_priority: CoroPriority = CoroPriority.normal, coro_scheduler: Optional[CoroSchedulerType]=None, asyncio_loop: Optional[asyncio.AbstractEventLoop]=None): 591 if coro_scheduler is None: 592 coro_scheduler = CoroScheduler.current_loop() 593 594 if coro_scheduler is None: 595 await yield_(FakeLoopYieldManagedAsync()) # can not determine coro scheduler loop 596 else: 597 if asyncio_loop is None: 598 asyncio_loop = asyncio.get_event_loop() 599 600 if (3, 7) <= sys.version_info: 601 ly: LoopYieldManagedAsyncExternal = await await_task_fast( 602 asyncio_loop, coro_scheduler, LoopYieldPriorityScheduler, 603 LoopYieldPrioritySchedulerRequest().register_external_asyncio_task(asyncio_loop, asyncio.current_task(loop=asyncio_loop), default_priority)) 604 else: 605 ly: LoopYieldManagedAsyncExternal = await await_task_fast( 606 asyncio_loop, coro_scheduler, LoopYieldPriorityScheduler, 607 LoopYieldPrioritySchedulerRequest().register_external(asyncio_loop, default_priority)) 608 609 try: 610 await yield_(ly) 611 finally: 612 if (3, 7) > sys.version_info: 613 await await_task_fast(asyncio_loop, coro_scheduler, LoopYieldPriorityScheduler, 614 LoopYieldPrioritySchedulerRequest().del_external(ly))