cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19"""
 20Module Docstring
 21Docstrings: http://www.python.org/dev/peps/pep-0257/
 22"""
 23
 24
 25__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 28__license__ = "Apache License, Version 2.0"
 29__version__ = "4.4.1"
 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 31__email__ = "gtalk@butenkoms.space"
 32# __status__ = "Prototype"
 33__status__ = "Development"
 34# __status__ = "Production"
 35
 36
 37__all__ = [
 38    'CoroPriority', 'ThisCoroWasRequestedToBeKilled', 'LoopYieldPrioritySchedulerRequest', 'LoopYieldManagedBase', 'LoopYieldManaged',
 39    'LoopYieldManagedAsync', 'FakeLoopYieldManaged', 'LoopYieldPriorityScheduler', 'get_loop_yield',
 40    'gly', 'aget_loop_yield', 'agly', 'LoopYieldManagedAsyncExternal', 'FakeLoopYieldManagedAsync', 'external_aget_loop_yield', 'eagly'
 41]
 42
 43from asyncio.events import Handle
 44from cengal.parallel_execution.coroutines.coro_scheduler import *
 45from cengal.parallel_execution.coroutines.coro_tools.await_coro import *
 46from enum import Enum
 47from typing import Callable, Dict, Tuple, Union, Type, Optional, Any, Set, Hashable
 48from cengal.time_management.repeat_for_a_time import Tracer, TimeLimitIsTooSmall
 49from cengal.code_flow_control.smart_values.versions.v_1 import ValueExistence
 50from async_generator import asynccontextmanager, async_generator, yield_
 51import asyncio
 52import sys
 53
 54
 55MIN_TIME = 0.000001
 56
 57
 58class CoroPriority(Enum):
 59    high = 0
 60    normal = 1
 61    low = 2
 62
 63
 64class LoopYieldPrioritySchedulerRequest(ServiceRequest):
 65    def register(self, default_priority: CoroPriority) -> 'LoopYieldManagedBase':
 66        return self._save(0, default_priority)
 67
 68    def setup(self, max_delay: float) -> None:
 69        return self._save(1, max_delay)
 70
 71    def change_priority(self, new_priority: CoroPriority, old_priority: CoroPriority) -> None:
 72        return self._save(2, new_priority, old_priority)
 73
 74    def get(self) -> 'LoopYieldManagedBase':
 75        return self._save(3)
 76
 77    def register_external(self, asyncio_loop: asyncio.AbstractEventLoop, default_priority: CoroPriority) -> 'LoopYieldManagedAsyncExternal':
 78        return self._save(4, asyncio_loop, default_priority)
 79
 80    def register_external_asyncio_task(self, asyncio_loop: asyncio.AbstractEventLoop, task: asyncio.Task, default_priority: CoroPriority) -> 'LoopYieldManagedAsyncExternal':
 81        return self._save(5, asyncio_loop, task, default_priority)
 82
 83    def change_priority_external(self, loop_yield: 'LoopYieldManagedAsyncExternal', new_priority: CoroPriority, old_priority: CoroPriority) -> None:
 84        return self._save(6, loop_yield, new_priority, old_priority)
 85
 86    def del_external(self, loop_yield: 'LoopYieldManagedAsyncExternal') -> None:
 87        return self._save(7, loop_yield)
 88
 89    def request_coro_kill(self, coro_id: CoroID) -> None:
 90        """Make request for a coro kill asynchronously (immidiately returns). An exception ThisCoroWasRequestedToBeKilled will be emmited in coro with a requested coro_id during one of it's next requests to LoopYield service
 91
 92        Args:
 93            coro_id (CoroID): _description_
 94
 95        Returns:
 96            ServiceRequest: _description_
 97        """        
 98        return self._save(8, coro_id)
 99    
100    def kill_coro(self, coro_id: CoroID) -> None:
101        """Make request for a coro kill synchronously (waits for next steps). An exception ThisCoroWasRequestedToBeKilled will be emmited in coro with a requested coro_id during one of it's next requests to LoopYield service. After an exception was sent to the requested coro_id, current coro will receive an answer from the service (None).
102
103        Args:
104            coro_id (CoroID): _description_
105
106        Returns:
107            ServiceRequest: _description_
108        """        
109        return self._save(9, coro_id)
110
111
112class LoopYieldManagedBase:
113    def __init__(self, interface: Interface, time_atom: ValueExistence,
114                 default_priority: CoroPriority, service: Type[Service]):
115        self._interface = None
116        self.interface = interface
117        self.time_atom = time_atom
118        self.default_priority = default_priority
119        self.priority = self.default_priority
120        self.service = service
121        self.tracer = None
122
123    @property
124    def interface(self):
125        if self._interface is None:
126            self._interface = current_interface()
127        
128        return self._interface
129
130    @interface.setter
131    def interface(self, value):
132        self._interface = value
133
134
135class LoopYieldManaged(LoopYieldManagedBase):
136    def __init__(self, interface: Interface, time_atom: ValueExistence,
137                 default_priority: CoroPriority, service: Type[Service]):
138        super(LoopYieldManaged, self).__init__(interface, time_atom, default_priority, service)
139
140    def __call__(self, priority: Optional[CoroPriority] = None):
141        if priority is None:
142            priority = self.default_priority
143        
144        if priority != self.priority:
145            self.interface = None
146            self.interface(self.service, LoopYieldPrioritySchedulerRequest().change_priority(priority,
147                                                                                             self.priority))
148            self.priority = priority
149            try:
150                self.tracer = Tracer(self.time_atom.result)
151            except TimeLimitIsTooSmall as ex:
152                self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME)
153        
154        if self.tracer is None:
155            exception = None
156            try:
157                self.tracer = Tracer(self.time_atom.result)
158            except TimeLimitIsTooSmall as ex:
159                exception = ex
160
161            if exception is not None:
162                try:
163                    self.tracer = Tracer(exception.min_time if exception.min_time is not None else MIN_TIME)
164                except TimeLimitIsTooSmall as ex:
165                    print(ex)
166                
167        if self.tracer is not None:
168            if not self.tracer.iter():
169                self.tracer = None
170                self.interface = None
171                self.interface(self.service)
172
173
174class LoopYieldManagedAsync(LoopYieldManagedBase):
175    def __init__(self, interface: Interface, time_atom: ValueExistence,
176                 default_priority: CoroPriority, service: Type[Service]):
177        super(LoopYieldManagedAsync, self).__init__(interface, time_atom, default_priority, service)
178
179    async def __call__(self, priority: Optional[CoroPriority] = None):
180        if priority is None:
181            priority = self.default_priority
182        if priority != self.priority:
183            self.interface = None
184            await self.interface(self.service, LoopYieldPrioritySchedulerRequest().change_priority(priority,
185                                                                                                   self.priority))
186            self.priority = priority
187            try:
188                self.tracer = Tracer(self.time_atom.result)
189            except TimeLimitIsTooSmall as ex:
190                self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME)
191        if self.tracer is None:
192            try:
193                self.tracer = Tracer(self.time_atom.result)
194            except TimeLimitIsTooSmall as ex:
195                self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME)
196        if not self.tracer.iter():
197            self.tracer = None
198            self.interface = None
199            await self.interface(self.service)
200
201
202class LoopYieldManagedAsyncExternal(LoopYieldManagedBase):
203    def __init__(self, task_id: int, time_atom: ValueExistence,
204                 default_priority: CoroPriority, service: Type[Service], coro_scheduler: CoroSchedulerType, asyncio_loop: asyncio.AbstractEventLoop):
205        super(LoopYieldManagedAsyncExternal, self).__init__(None, time_atom, default_priority, service)
206        self.task_id = task_id
207        self.coro_scheduler = coro_scheduler
208        self.asyncio_loop = asyncio_loop
209        self.asyncio_task: asyncio.Task = None
210        self.on_done_asyncio_coro: Callable = None
211
212    async def __call__(self, priority: Optional[CoroPriority] = None):
213        if priority is None:
214            priority = self.default_priority
215        if priority != self.priority:
216            await await_task_fast(self.asyncio_loop, self.coro_scheduler, self.service,
217                                  LoopYieldPrioritySchedulerRequest().change_priority_external(self, priority, self.priority))
218            self.priority = priority
219            try:
220                self.tracer = Tracer(self.time_atom.result)
221            except TimeLimitIsTooSmall as ex:
222                self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME)
223        if self.tracer is None:
224            try:
225                self.tracer = Tracer(self.time_atom.result)
226            except TimeLimitIsTooSmall as ex:
227                self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME)
228        if not self.tracer.iter():
229            self.tracer = None
230            await await_task_fast(self.asyncio_loop, self.coro_scheduler, self.service)
231
232
233class FakeLoopYieldManaged:
234    def __call__(self, priority: Optional[CoroPriority] = None):
235        pass
236
237class FakeLoopYieldManagedAsync:
238    async def __call__(self, priority: Optional[CoroPriority] = None):
239        pass
240
241
242class ThisCoroWasRequestedToBeKilled(Exception):
243    pass
244
245
246class LoopYieldPriorityScheduler(TypedService[None], EntityStatsMixin):
247    def __init__(self, loop: CoroSchedulerType):
248        super(LoopYieldPriorityScheduler, self).__init__(loop)
249
250        # loop.add_global_on_coro_del_handler(self._on_coro_del_handler_global)  # Todo: switch to local coro del handler
251
252        self._request_workers = {
253            0: self._on_register,
254            1: self._on_setup,
255            2: self._on_change_priority,
256            3: self._on_get,
257            4: self._on_register_external,
258            5: self._on_register_external_asyncio_task,
259            6: self._on_change_priority_external,
260            7: self._on_del_external,
261            8: self._request_coro_kill,
262            9: self._kill_coro,
263        }
264
265        self.sigma = {
266            0: 0.6827,
267            1: 0.9545 - 0.6827,
268            2: 1.0 - 0.9545,
269        }
270
271        self.max_delay = 0.01
272        self.max_delays = {
273            0: 0.0,
274            1: 0.0,
275            2: 0.0,
276        }
277        self.compute_delays()
278
279        self.all_yield_objects = dict()  # type: Dict[CoroID, LoopYieldManagedBase]
280        self.task_counter = Counter()
281        while self.task_counter.get() <= 0:
282            pass
283        
284        self.yields_num: int = 0
285        self.coroutines_requested_to_be_deleted: Set[CoroID] = set()
286        self.coroutines_requested_to_be_deleted_by_waiters: Dict[CoroID, Set[CoroID]] = dict()
287        self.finished_waiters_for_coro_kill: Set[CoroID] = set()
288        self.asyncio_task_ids: Dict[Hashable, int] = dict()
289
290        self.yields_by_priority: Dict[CoroPriority, int] = {
291            CoroPriority.high:   0,
292            CoroPriority.normal: 0,
293            CoroPriority.low:    0,
294        }
295
296        self.time_atom_by_priority = {
297            CoroPriority.high:   ValueExistence(True, self.max_delays[0]),
298            CoroPriority.normal: ValueExistence(True, self.max_delays[1]),
299            CoroPriority.low:    ValueExistence(True, self.max_delays[2]),
300        }
301
302    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
303        coroutines_requested_to_be_killed = self.coroutines_requested_to_be_deleted | set(self.coroutines_requested_to_be_deleted_by_waiters.keys())
304        return type(self).__name__, {
305            'task counter': self.task_counter._index,
306            'yields num': self.yields_num,
307            'max_delay': self.max_delay,
308            'max_delays': self.max_delays,
309            'affected coroutines num': len(self.all_yield_objects),
310            'coroutines num by priority': {
311                'high': self.yields_by_priority[CoroPriority.high],
312                'normal': self.yields_by_priority[CoroPriority.normal],
313                'low': self.yields_by_priority[CoroPriority.low],
314            },
315            'time atoms by priority': {
316                'high': self.time_atom_by_priority[CoroPriority.high].result,
317                'normal': self.time_atom_by_priority[CoroPriority.normal].result,
318                'low': self.time_atom_by_priority[CoroPriority.low].result,
319            },
320            'coroutines requested to be killed': {
321                'num': len(coroutines_requested_to_be_killed),
322                'list': coroutines_requested_to_be_killed,
323            }
324        }
325
326    def single_task_registration_or_immediate_processing(self, request: Optional[LoopYieldPrioritySchedulerRequest]=None
327                                                         ) -> Tuple[bool, Any, None]:
328        self.yields_num += 1
329        coro_should_be_killed = False
330        coro_id = self.current_caller_coro_info.coro_id
331        if coro_id in self.coroutines_requested_to_be_deleted:
332            coro_should_be_killed = True
333        
334        if coro_id in self.coroutines_requested_to_be_deleted_by_waiters:
335            self.finished_waiters_for_coro_kill |= self.coroutines_requested_to_be_deleted_by_waiters[coro_id]
336            self.make_live()
337            coro_should_be_killed = True
338        
339        if request is not None:
340            if coro_should_be_killed and (self._on_del_external != self._request_workers[request.request_type]):
341                return True, None, ThisCoroWasRequestedToBeKilled
342                
343            return self.resolve_request(request)
344
345        if coro_should_be_killed:
346            return True, None, ThisCoroWasRequestedToBeKilled
347        
348        return True, None, None
349
350    def full_processing_iteration(self):
351        for coro_id in self.finished_waiters_for_coro_kill:
352            self.register_response(coro_id, None)
353        
354        self.compute_time_atoms()
355
356    def compute_time_atoms(self):
357        if self.all_yield_objects:
358            top_sigma = 0
359
360            if self.yields_by_priority[CoroPriority.high]:
361                top_sigma += 1
362
363            if self.yields_by_priority[CoroPriority.normal]:
364                top_sigma += 1
365
366            if self.yields_by_priority[CoroPriority.low]:
367                top_sigma += 1
368
369            median_time_atom = 1 / len(self.all_yield_objects)  # !!! Possible division by zero! Conditional must not be removed!
370            if 1 == top_sigma:
371                sigma_0_time_atom = min(median_time_atom, self.max_delays[0])
372                self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom
373                self.time_atom_by_priority[CoroPriority.normal].result = sigma_0_time_atom
374                self.time_atom_by_priority[CoroPriority.low].result = sigma_0_time_atom
375            elif 2 == top_sigma:
376                sigma_0_time_atom = min(median_time_atom * self.sigma[0], self.max_delays[0])
377                sigma_1_time_atom = min(median_time_atom * self.sigma[1], self.max_delays[1])
378                if self.yields_by_priority[CoroPriority.high]:
379                    self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom
380                    self.time_atom_by_priority[CoroPriority.normal].result = sigma_1_time_atom
381                    self.time_atom_by_priority[CoroPriority.low].result = sigma_1_time_atom
382                else:
383                    self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom
384                    self.time_atom_by_priority[CoroPriority.normal].result = sigma_0_time_atom
385                    self.time_atom_by_priority[CoroPriority.low].result = sigma_1_time_atom
386            elif 3 == top_sigma:
387                sigma_0_time_atom = min(median_time_atom * self.sigma[0], self.max_delays[0])
388                sigma_1_time_atom = min(median_time_atom * self.sigma[1], self.max_delays[1])
389                sigma_2_time_atom = min(median_time_atom * self.sigma[2], self.max_delays[2])
390                self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom
391                self.time_atom_by_priority[CoroPriority.normal].result = sigma_1_time_atom
392                self.time_atom_by_priority[CoroPriority.low].result = sigma_2_time_atom
393        
394        self.make_dead()
395
396    def in_work(self) -> bool:
397        result: bool = bool(self.finished_waiters_for_coro_kill) or bool(self.all_yield_objects)
398        return self.thrifty_in_work(result)
399
400    def compute_delays(self):
401        self.max_delays = {
402            0: self.max_delay * self.sigma[0],
403            1: self.max_delay * self.sigma[1],
404            2: self.max_delay * self.sigma[2],
405        }
406
407    def _on_register(self, default_priority: CoroPriority):
408        task_id = self.current_caller_coro_info.coro_id
409        if task_id in self.all_yield_objects:
410            loop_yield = self.all_yield_objects[task_id]
411        else:
412            interface: Interface = self.current_caller_coro_info.coro.interface
413            if isinstance(interface, InterfaceGreenlet):
414                loop_yield = LoopYieldManaged(interface,
415                                            self.time_atom_by_priority[default_priority],
416                                            default_priority,
417                                            type(self))
418            elif isinstance(interface, InterfaceAsyncAwait):
419                loop_yield = LoopYieldManagedAsync(interface,
420                                                self.time_atom_by_priority[default_priority],
421                                                default_priority,
422                                                type(self))
423            else:
424                raise NotImplementedError
425            
426            loop_yield.time_atom = self.time_atom_by_priority[loop_yield.priority]
427            self.all_yield_objects[interface.coro_id] = loop_yield
428            self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler)
429            self.yields_by_priority[loop_yield.priority] += 1
430            self.make_live()
431        
432        return True, loop_yield, None
433
434    def _on_setup(self, max_delay: float):
435        self.max_delay = max_delay
436        self.compute_delays()
437        # self.compute_time_atoms()
438        self.make_live()
439        return True, None, None
440
441    def _on_change_priority(self, new_priority: CoroPriority, old_priority: CoroPriority):
442        self.yields_by_priority[old_priority] -= 1
443        self.yields_by_priority[new_priority] += 1
444        loop_yield = self.all_yield_objects[self.current_caller_coro_info.coro_id]
445        loop_yield.time_atom = self.time_atom_by_priority[new_priority]
446        self.make_live()
447        return True, None, None
448
449    def _on_get(self):
450        return True, self.all_yield_objects.get(self.current_caller_coro_info.coro_id), None
451
452    def get_yield_object(self, coro_id: CoroID) -> LoopYieldManagedBase:
453        return self.all_yield_objects.get(coro_id)
454    
455    def _on_register_external(self, asyncio_loop: asyncio.AbstractEventLoop, default_priority: CoroPriority):
456        task_id = -(self.task_counter.get())
457        loop_yield = LoopYieldManagedAsyncExternal(task_id,
458                                                   self.time_atom_by_priority[default_priority],
459                                                   default_priority,
460                                                   type(self),
461                                                   self._loop,
462                                                   asyncio_loop)
463        loop_yield.time_atom = self.time_atom_by_priority[loop_yield.priority]
464        self.all_yield_objects[task_id] = loop_yield
465        self.yields_by_priority[loop_yield.priority] += 1
466        self.make_live()
467        return True, loop_yield, None
468    
469    def _on_register_external_asyncio_task(self, asyncio_loop: asyncio.AbstractEventLoop, task: asyncio.Task, default_priority: CoroPriority):
470        asyncio_task_id = id(task)
471        if asyncio_task_id not in self.asyncio_task_ids:
472            self.asyncio_task_ids[asyncio_task_id] = -(self.task_counter.get())
473            
474        task_id = self.asyncio_task_ids[asyncio_task_id]
475        if task_id in self.all_yield_objects:
476            loop_yield = self.all_yield_objects[task_id]
477        else:
478            loop_yield = LoopYieldManagedAsyncExternal(task_id,
479                                                    self.time_atom_by_priority[default_priority],
480                                                    default_priority,
481                                                    type(self),
482                                                    self._loop,
483                                                    asyncio_loop)
484            def on_done_asyncio_coro(future):
485                self._on_del_external(loop_yield)
486                task.remove_done_callback(loop_yield.on_done_asyncio_coro)
487            
488            loop_yield.asyncio_task = task
489            loop_yield.on_done_asyncio_coro = on_done_asyncio_coro
490            task.add_done_callback(loop_yield.on_done_asyncio_coro)
491            loop_yield.time_atom = self.time_atom_by_priority[loop_yield.priority]
492            self.all_yield_objects[task_id] = loop_yield
493            self.yields_by_priority[loop_yield.priority] += 1
494            self.make_live()
495        
496        return True, loop_yield, None
497    
498    def _on_change_priority_external(self, loop_yield: LoopYieldManagedAsyncExternal, new_priority: CoroPriority, old_priority: CoroPriority):
499        self.yields_by_priority[old_priority] -= 1
500        self.yields_by_priority[new_priority] += 1
501        loop_yield.time_atom = self.time_atom_by_priority[new_priority]
502        self.make_live()
503        return True, None, None
504    
505    def _on_del_external(self, loop_yield: LoopYieldManagedAsyncExternal):
506        task_id = loop_yield.task_id
507        if task_id in self.all_yield_objects:
508            priority = self.all_yield_objects[task_id].priority
509            del self.all_yield_objects[task_id]
510            self.yields_by_priority[priority] -= 1
511        self.make_live()
512        return True, None, None
513    
514    def _request_coro_kill(self, coro_id: CoroID) -> ServiceProcessingResponse:
515        self.coroutines_requested_to_be_deleted.add(coro_id)
516        return True, None, None
517    
518    def _kill_coro(self, coro_id: CoroID) -> ServiceProcessingResponse:
519        waiter_coro_id = self.current_caller_coro_info
520        if coro_id not in self.coroutines_requested_to_be_deleted_by_waiters:
521            self.coroutines_requested_to_be_deleted_by_waiters[coro_id] = set()
522        
523        self.coroutines_requested_to_be_deleted_by_waiters[coro_id].add(waiter_coro_id)
524        return False, None, None
525
526    def _on_coro_del_handler_global(self, coro: CoroWrapperBase) -> bool:
527        coro_id = coro.coro_id
528        if coro_id in self.all_yield_objects:
529            priority = self.all_yield_objects[coro_id].priority
530            del self.all_yield_objects[coro_id]
531            self.yields_by_priority[priority] -= 1
532            self.make_live()
533        return False
534
535    def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool:
536        coro_id = coro.coro_id
537        priority = self.all_yield_objects[coro_id].priority
538        del self.all_yield_objects[coro_id]
539        self.yields_by_priority[priority] -= 1
540        self.make_live()
541        return False
542
543
544LoopYieldPrioritySchedulerRequest.default_service_type = LoopYieldPriorityScheduler
545
546
547def get_loop_yield(default_priority: CoroPriority = CoroPriority.normal) -> Union[LoopYieldManaged, FakeLoopYieldManaged]:
548    loop = CoroScheduler.current_loop()
549    if loop is None:
550        return FakeLoopYieldManaged()  # running not from inside the loop
551
552    interface = loop.current_interface()
553    if interface is None:
554        return FakeLoopYieldManaged()  # running from Service
555
556    # ly = interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().get())
557    ly = interface._loop.get_service_instance(LoopYieldPriorityScheduler).get_yield_object(interface.coro_id)
558    if ly is None:
559        ly = interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().register(default_priority))
560    
561    return ly
562
563
564gly = get_loop_yield
565
566
567async def aget_loop_yield(default_priority: CoroPriority = CoroPriority.normal) -> Union[LoopYieldManagedAsync, FakeLoopYieldManagedAsync]:
568    loop = CoroScheduler.current_loop()
569    if loop is None:
570        return FakeLoopYieldManagedAsync()  # running not from inside the loop
571
572    interface = loop.current_interface()
573    if interface is None:
574        return FakeLoopYieldManagedAsync()  # running from Service
575
576    # ly = await interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().get())
577    ly = interface._loop.get_service_instance(LoopYieldPriorityScheduler).get_yield_object(interface.coro_id)
578    if ly is None:
579        ly = await interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().register(default_priority))
580    
581    return ly
582
583
584agly = aget_loop_yield
585
586
587@asynccontextmanager
588@async_generator
589async def external_aget_loop_yield(default_priority: CoroPriority = CoroPriority.normal, coro_scheduler: Optional[CoroSchedulerType]=None, asyncio_loop: Optional[asyncio.AbstractEventLoop]=None):
590    if coro_scheduler is None:
591        coro_scheduler = CoroScheduler.current_loop()
592    
593    if coro_scheduler is None:
594        await yield_(FakeLoopYieldManagedAsync())  # can not determine coro scheduler loop
595    else:
596        if asyncio_loop is None:
597            asyncio_loop = asyncio.get_event_loop()
598        
599        if (3, 7) <= sys.version_info:
600            ly: LoopYieldManagedAsyncExternal = await await_task_fast(
601                asyncio_loop, coro_scheduler, LoopYieldPriorityScheduler,
602                LoopYieldPrioritySchedulerRequest().register_external_asyncio_task(asyncio_loop, asyncio.current_task(loop=asyncio_loop), default_priority))
603        else:
604            ly: LoopYieldManagedAsyncExternal = await await_task_fast(
605                asyncio_loop, coro_scheduler, LoopYieldPriorityScheduler,
606                LoopYieldPrioritySchedulerRequest().register_external(asyncio_loop, default_priority))
607
608        try:
609            await yield_(ly)
610        finally:
611            if (3, 7) > sys.version_info:
612                await await_task_fast(asyncio_loop, coro_scheduler, LoopYieldPriorityScheduler,
613                                    LoopYieldPrioritySchedulerRequest().del_external(ly))
614
615eagly = external_aget_loop_yield
class CoroPriority(enum.Enum):
59class CoroPriority(Enum):
60    high = 0
61    normal = 1
62    low = 2

An enumeration.

high = <CoroPriority.high: 0>
normal = <CoroPriority.normal: 1>
low = <CoroPriority.low: 2>
Inherited Members
enum.Enum
name
value
class ThisCoroWasRequestedToBeKilled(builtins.Exception):
243class ThisCoroWasRequestedToBeKilled(Exception):
244    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class LoopYieldPrioritySchedulerRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
 65class LoopYieldPrioritySchedulerRequest(ServiceRequest):
 66    def register(self, default_priority: CoroPriority) -> 'LoopYieldManagedBase':
 67        return self._save(0, default_priority)
 68
 69    def setup(self, max_delay: float) -> None:
 70        return self._save(1, max_delay)
 71
 72    def change_priority(self, new_priority: CoroPriority, old_priority: CoroPriority) -> None:
 73        return self._save(2, new_priority, old_priority)
 74
 75    def get(self) -> 'LoopYieldManagedBase':
 76        return self._save(3)
 77
 78    def register_external(self, asyncio_loop: asyncio.AbstractEventLoop, default_priority: CoroPriority) -> 'LoopYieldManagedAsyncExternal':
 79        return self._save(4, asyncio_loop, default_priority)
 80
 81    def register_external_asyncio_task(self, asyncio_loop: asyncio.AbstractEventLoop, task: asyncio.Task, default_priority: CoroPriority) -> 'LoopYieldManagedAsyncExternal':
 82        return self._save(5, asyncio_loop, task, default_priority)
 83
 84    def change_priority_external(self, loop_yield: 'LoopYieldManagedAsyncExternal', new_priority: CoroPriority, old_priority: CoroPriority) -> None:
 85        return self._save(6, loop_yield, new_priority, old_priority)
 86
 87    def del_external(self, loop_yield: 'LoopYieldManagedAsyncExternal') -> None:
 88        return self._save(7, loop_yield)
 89
 90    def request_coro_kill(self, coro_id: CoroID) -> None:
 91        """Make request for a coro kill asynchronously (immidiately returns). An exception ThisCoroWasRequestedToBeKilled will be emmited in coro with a requested coro_id during one of it's next requests to LoopYield service
 92
 93        Args:
 94            coro_id (CoroID): _description_
 95
 96        Returns:
 97            ServiceRequest: _description_
 98        """        
 99        return self._save(8, coro_id)
100    
101    def kill_coro(self, coro_id: CoroID) -> None:
102        """Make request for a coro kill synchronously (waits for next steps). An exception ThisCoroWasRequestedToBeKilled will be emmited in coro with a requested coro_id during one of it's next requests to LoopYield service. After an exception was sent to the requested coro_id, current coro will receive an answer from the service (None).
103
104        Args:
105            coro_id (CoroID): _description_
106
107        Returns:
108            ServiceRequest: _description_
109        """        
110        return self._save(9, coro_id)
def register( self, default_priority: CoroPriority) -> LoopYieldManagedBase:
66    def register(self, default_priority: CoroPriority) -> 'LoopYieldManagedBase':
67        return self._save(0, default_priority)
def setup(self, max_delay: float) -> None:
69    def setup(self, max_delay: float) -> None:
70        return self._save(1, max_delay)
def change_priority( self, new_priority: CoroPriority, old_priority: CoroPriority) -> None:
72    def change_priority(self, new_priority: CoroPriority, old_priority: CoroPriority) -> None:
73        return self._save(2, new_priority, old_priority)
def get( self) -> LoopYieldManagedBase:
75    def get(self) -> 'LoopYieldManagedBase':
76        return self._save(3)
def register_external( self, asyncio_loop: asyncio.events.AbstractEventLoop, default_priority: CoroPriority) -> LoopYieldManagedAsyncExternal:
78    def register_external(self, asyncio_loop: asyncio.AbstractEventLoop, default_priority: CoroPriority) -> 'LoopYieldManagedAsyncExternal':
79        return self._save(4, asyncio_loop, default_priority)
def register_external_asyncio_task( self, asyncio_loop: asyncio.events.AbstractEventLoop, task: _asyncio.Task, default_priority: CoroPriority) -> LoopYieldManagedAsyncExternal:
81    def register_external_asyncio_task(self, asyncio_loop: asyncio.AbstractEventLoop, task: asyncio.Task, default_priority: CoroPriority) -> 'LoopYieldManagedAsyncExternal':
82        return self._save(5, asyncio_loop, task, default_priority)
def change_priority_external( self, loop_yield: LoopYieldManagedAsyncExternal, new_priority: CoroPriority, old_priority: CoroPriority) -> None:
84    def change_priority_external(self, loop_yield: 'LoopYieldManagedAsyncExternal', new_priority: CoroPriority, old_priority: CoroPriority) -> None:
85        return self._save(6, loop_yield, new_priority, old_priority)
def del_external( self, loop_yield: LoopYieldManagedAsyncExternal) -> None:
87    def del_external(self, loop_yield: 'LoopYieldManagedAsyncExternal') -> None:
88        return self._save(7, loop_yield)
def request_coro_kill(self, coro_id: int) -> None:
90    def request_coro_kill(self, coro_id: CoroID) -> None:
91        """Make request for a coro kill asynchronously (immidiately returns). An exception ThisCoroWasRequestedToBeKilled will be emmited in coro with a requested coro_id during one of it's next requests to LoopYield service
92
93        Args:
94            coro_id (CoroID): _description_
95
96        Returns:
97            ServiceRequest: _description_
98        """        
99        return self._save(8, coro_id)

Make request for a coro kill asynchronously (immidiately returns). An exception ThisCoroWasRequestedToBeKilled will be emmited in coro with a requested coro_id during one of it's next requests to LoopYield service

Args: coro_id (CoroID): _description_

Returns: ServiceRequest: _description_

def kill_coro(self, coro_id: int) -> None:
101    def kill_coro(self, coro_id: CoroID) -> None:
102        """Make request for a coro kill synchronously (waits for next steps). An exception ThisCoroWasRequestedToBeKilled will be emmited in coro with a requested coro_id during one of it's next requests to LoopYield service. After an exception was sent to the requested coro_id, current coro will receive an answer from the service (None).
103
104        Args:
105            coro_id (CoroID): _description_
106
107        Returns:
108            ServiceRequest: _description_
109        """        
110        return self._save(9, coro_id)

Make request for a coro kill synchronously (waits for next steps). An exception ThisCoroWasRequestedToBeKilled will be emmited in coro with a requested coro_id during one of it's next requests to LoopYield service. After an exception was sent to the requested coro_id, current coro will receive an answer from the service (None).

Args: coro_id (CoroID): _description_

Returns: ServiceRequest: _description_

default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] = <class 'LoopYieldPriorityScheduler'>
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
default__request__type__
request_type
args
kwargs
provide_to_request_handler
interface
i
async_interface
ai
class LoopYieldManagedBase:
113class LoopYieldManagedBase:
114    def __init__(self, interface: Interface, time_atom: ValueExistence,
115                 default_priority: CoroPriority, service: Type[Service]):
116        self._interface = None
117        self.interface = interface
118        self.time_atom = time_atom
119        self.default_priority = default_priority
120        self.priority = self.default_priority
121        self.service = service
122        self.tracer = None
123
124    @property
125    def interface(self):
126        if self._interface is None:
127            self._interface = current_interface()
128        
129        return self._interface
130
131    @interface.setter
132    def interface(self, value):
133        self._interface = value
LoopYieldManagedBase( interface: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, time_atom: cengal.code_flow_control.smart_values.versions.v_1.smart_values.ValueExistence, default_priority: CoroPriority, service: typing.Type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service])
114    def __init__(self, interface: Interface, time_atom: ValueExistence,
115                 default_priority: CoroPriority, service: Type[Service]):
116        self._interface = None
117        self.interface = interface
118        self.time_atom = time_atom
119        self.default_priority = default_priority
120        self.priority = self.default_priority
121        self.service = service
122        self.tracer = None
interface
124    @property
125    def interface(self):
126        if self._interface is None:
127            self._interface = current_interface()
128        
129        return self._interface
time_atom
default_priority
priority
service
tracer
class LoopYieldManaged(LoopYieldManagedBase):
136class LoopYieldManaged(LoopYieldManagedBase):
137    def __init__(self, interface: Interface, time_atom: ValueExistence,
138                 default_priority: CoroPriority, service: Type[Service]):
139        super(LoopYieldManaged, self).__init__(interface, time_atom, default_priority, service)
140
141    def __call__(self, priority: Optional[CoroPriority] = None):
142        if priority is None:
143            priority = self.default_priority
144        
145        if priority != self.priority:
146            self.interface = None
147            self.interface(self.service, LoopYieldPrioritySchedulerRequest().change_priority(priority,
148                                                                                             self.priority))
149            self.priority = priority
150            try:
151                self.tracer = Tracer(self.time_atom.result)
152            except TimeLimitIsTooSmall as ex:
153                self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME)
154        
155        if self.tracer is None:
156            exception = None
157            try:
158                self.tracer = Tracer(self.time_atom.result)
159            except TimeLimitIsTooSmall as ex:
160                exception = ex
161
162            if exception is not None:
163                try:
164                    self.tracer = Tracer(exception.min_time if exception.min_time is not None else MIN_TIME)
165                except TimeLimitIsTooSmall as ex:
166                    print(ex)
167                
168        if self.tracer is not None:
169            if not self.tracer.iter():
170                self.tracer = None
171                self.interface = None
172                self.interface(self.service)
LoopYieldManaged( interface: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, time_atom: cengal.code_flow_control.smart_values.versions.v_1.smart_values.ValueExistence, default_priority: CoroPriority, service: typing.Type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service])
137    def __init__(self, interface: Interface, time_atom: ValueExistence,
138                 default_priority: CoroPriority, service: Type[Service]):
139        super(LoopYieldManaged, self).__init__(interface, time_atom, default_priority, service)
class LoopYieldManagedAsync(LoopYieldManagedBase):
175class LoopYieldManagedAsync(LoopYieldManagedBase):
176    def __init__(self, interface: Interface, time_atom: ValueExistence,
177                 default_priority: CoroPriority, service: Type[Service]):
178        super(LoopYieldManagedAsync, self).__init__(interface, time_atom, default_priority, service)
179
180    async def __call__(self, priority: Optional[CoroPriority] = None):
181        if priority is None:
182            priority = self.default_priority
183        if priority != self.priority:
184            self.interface = None
185            await self.interface(self.service, LoopYieldPrioritySchedulerRequest().change_priority(priority,
186                                                                                                   self.priority))
187            self.priority = priority
188            try:
189                self.tracer = Tracer(self.time_atom.result)
190            except TimeLimitIsTooSmall as ex:
191                self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME)
192        if self.tracer is None:
193            try:
194                self.tracer = Tracer(self.time_atom.result)
195            except TimeLimitIsTooSmall as ex:
196                self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME)
197        if not self.tracer.iter():
198            self.tracer = None
199            self.interface = None
200            await self.interface(self.service)
LoopYieldManagedAsync( interface: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, time_atom: cengal.code_flow_control.smart_values.versions.v_1.smart_values.ValueExistence, default_priority: CoroPriority, service: typing.Type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service])
176    def __init__(self, interface: Interface, time_atom: ValueExistence,
177                 default_priority: CoroPriority, service: Type[Service]):
178        super(LoopYieldManagedAsync, self).__init__(interface, time_atom, default_priority, service)
class FakeLoopYieldManaged:
234class FakeLoopYieldManaged:
235    def __call__(self, priority: Optional[CoroPriority] = None):
236        pass
class LoopYieldPriorityScheduler(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.TypedService[NoneType], cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin):
247class LoopYieldPriorityScheduler(TypedService[None], EntityStatsMixin):
248    def __init__(self, loop: CoroSchedulerType):
249        super(LoopYieldPriorityScheduler, self).__init__(loop)
250
251        # loop.add_global_on_coro_del_handler(self._on_coro_del_handler_global)  # Todo: switch to local coro del handler
252
253        self._request_workers = {
254            0: self._on_register,
255            1: self._on_setup,
256            2: self._on_change_priority,
257            3: self._on_get,
258            4: self._on_register_external,
259            5: self._on_register_external_asyncio_task,
260            6: self._on_change_priority_external,
261            7: self._on_del_external,
262            8: self._request_coro_kill,
263            9: self._kill_coro,
264        }
265
266        self.sigma = {
267            0: 0.6827,
268            1: 0.9545 - 0.6827,
269            2: 1.0 - 0.9545,
270        }
271
272        self.max_delay = 0.01
273        self.max_delays = {
274            0: 0.0,
275            1: 0.0,
276            2: 0.0,
277        }
278        self.compute_delays()
279
280        self.all_yield_objects = dict()  # type: Dict[CoroID, LoopYieldManagedBase]
281        self.task_counter = Counter()
282        while self.task_counter.get() <= 0:
283            pass
284        
285        self.yields_num: int = 0
286        self.coroutines_requested_to_be_deleted: Set[CoroID] = set()
287        self.coroutines_requested_to_be_deleted_by_waiters: Dict[CoroID, Set[CoroID]] = dict()
288        self.finished_waiters_for_coro_kill: Set[CoroID] = set()
289        self.asyncio_task_ids: Dict[Hashable, int] = dict()
290
291        self.yields_by_priority: Dict[CoroPriority, int] = {
292            CoroPriority.high:   0,
293            CoroPriority.normal: 0,
294            CoroPriority.low:    0,
295        }
296
297        self.time_atom_by_priority = {
298            CoroPriority.high:   ValueExistence(True, self.max_delays[0]),
299            CoroPriority.normal: ValueExistence(True, self.max_delays[1]),
300            CoroPriority.low:    ValueExistence(True, self.max_delays[2]),
301        }
302
303    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
304        coroutines_requested_to_be_killed = self.coroutines_requested_to_be_deleted | set(self.coroutines_requested_to_be_deleted_by_waiters.keys())
305        return type(self).__name__, {
306            'task counter': self.task_counter._index,
307            'yields num': self.yields_num,
308            'max_delay': self.max_delay,
309            'max_delays': self.max_delays,
310            'affected coroutines num': len(self.all_yield_objects),
311            'coroutines num by priority': {
312                'high': self.yields_by_priority[CoroPriority.high],
313                'normal': self.yields_by_priority[CoroPriority.normal],
314                'low': self.yields_by_priority[CoroPriority.low],
315            },
316            'time atoms by priority': {
317                'high': self.time_atom_by_priority[CoroPriority.high].result,
318                'normal': self.time_atom_by_priority[CoroPriority.normal].result,
319                'low': self.time_atom_by_priority[CoroPriority.low].result,
320            },
321            'coroutines requested to be killed': {
322                'num': len(coroutines_requested_to_be_killed),
323                'list': coroutines_requested_to_be_killed,
324            }
325        }
326
327    def single_task_registration_or_immediate_processing(self, request: Optional[LoopYieldPrioritySchedulerRequest]=None
328                                                         ) -> Tuple[bool, Any, None]:
329        self.yields_num += 1
330        coro_should_be_killed = False
331        coro_id = self.current_caller_coro_info.coro_id
332        if coro_id in self.coroutines_requested_to_be_deleted:
333            coro_should_be_killed = True
334        
335        if coro_id in self.coroutines_requested_to_be_deleted_by_waiters:
336            self.finished_waiters_for_coro_kill |= self.coroutines_requested_to_be_deleted_by_waiters[coro_id]
337            self.make_live()
338            coro_should_be_killed = True
339        
340        if request is not None:
341            if coro_should_be_killed and (self._on_del_external != self._request_workers[request.request_type]):
342                return True, None, ThisCoroWasRequestedToBeKilled
343                
344            return self.resolve_request(request)
345
346        if coro_should_be_killed:
347            return True, None, ThisCoroWasRequestedToBeKilled
348        
349        return True, None, None
350
351    def full_processing_iteration(self):
352        for coro_id in self.finished_waiters_for_coro_kill:
353            self.register_response(coro_id, None)
354        
355        self.compute_time_atoms()
356
357    def compute_time_atoms(self):
358        if self.all_yield_objects:
359            top_sigma = 0
360
361            if self.yields_by_priority[CoroPriority.high]:
362                top_sigma += 1
363
364            if self.yields_by_priority[CoroPriority.normal]:
365                top_sigma += 1
366
367            if self.yields_by_priority[CoroPriority.low]:
368                top_sigma += 1
369
370            median_time_atom = 1 / len(self.all_yield_objects)  # !!! Possible division by zero! Conditional must not be removed!
371            if 1 == top_sigma:
372                sigma_0_time_atom = min(median_time_atom, self.max_delays[0])
373                self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom
374                self.time_atom_by_priority[CoroPriority.normal].result = sigma_0_time_atom
375                self.time_atom_by_priority[CoroPriority.low].result = sigma_0_time_atom
376            elif 2 == top_sigma:
377                sigma_0_time_atom = min(median_time_atom * self.sigma[0], self.max_delays[0])
378                sigma_1_time_atom = min(median_time_atom * self.sigma[1], self.max_delays[1])
379                if self.yields_by_priority[CoroPriority.high]:
380                    self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom
381                    self.time_atom_by_priority[CoroPriority.normal].result = sigma_1_time_atom
382                    self.time_atom_by_priority[CoroPriority.low].result = sigma_1_time_atom
383                else:
384                    self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom
385                    self.time_atom_by_priority[CoroPriority.normal].result = sigma_0_time_atom
386                    self.time_atom_by_priority[CoroPriority.low].result = sigma_1_time_atom
387            elif 3 == top_sigma:
388                sigma_0_time_atom = min(median_time_atom * self.sigma[0], self.max_delays[0])
389                sigma_1_time_atom = min(median_time_atom * self.sigma[1], self.max_delays[1])
390                sigma_2_time_atom = min(median_time_atom * self.sigma[2], self.max_delays[2])
391                self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom
392                self.time_atom_by_priority[CoroPriority.normal].result = sigma_1_time_atom
393                self.time_atom_by_priority[CoroPriority.low].result = sigma_2_time_atom
394        
395        self.make_dead()
396
397    def in_work(self) -> bool:
398        result: bool = bool(self.finished_waiters_for_coro_kill) or bool(self.all_yield_objects)
399        return self.thrifty_in_work(result)
400
401    def compute_delays(self):
402        self.max_delays = {
403            0: self.max_delay * self.sigma[0],
404            1: self.max_delay * self.sigma[1],
405            2: self.max_delay * self.sigma[2],
406        }
407
408    def _on_register(self, default_priority: CoroPriority):
409        task_id = self.current_caller_coro_info.coro_id
410        if task_id in self.all_yield_objects:
411            loop_yield = self.all_yield_objects[task_id]
412        else:
413            interface: Interface = self.current_caller_coro_info.coro.interface
414            if isinstance(interface, InterfaceGreenlet):
415                loop_yield = LoopYieldManaged(interface,
416                                            self.time_atom_by_priority[default_priority],
417                                            default_priority,
418                                            type(self))
419            elif isinstance(interface, InterfaceAsyncAwait):
420                loop_yield = LoopYieldManagedAsync(interface,
421                                                self.time_atom_by_priority[default_priority],
422                                                default_priority,
423                                                type(self))
424            else:
425                raise NotImplementedError
426            
427            loop_yield.time_atom = self.time_atom_by_priority[loop_yield.priority]
428            self.all_yield_objects[interface.coro_id] = loop_yield
429            self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler)
430            self.yields_by_priority[loop_yield.priority] += 1
431            self.make_live()
432        
433        return True, loop_yield, None
434
435    def _on_setup(self, max_delay: float):
436        self.max_delay = max_delay
437        self.compute_delays()
438        # self.compute_time_atoms()
439        self.make_live()
440        return True, None, None
441
442    def _on_change_priority(self, new_priority: CoroPriority, old_priority: CoroPriority):
443        self.yields_by_priority[old_priority] -= 1
444        self.yields_by_priority[new_priority] += 1
445        loop_yield = self.all_yield_objects[self.current_caller_coro_info.coro_id]
446        loop_yield.time_atom = self.time_atom_by_priority[new_priority]
447        self.make_live()
448        return True, None, None
449
450    def _on_get(self):
451        return True, self.all_yield_objects.get(self.current_caller_coro_info.coro_id), None
452
453    def get_yield_object(self, coro_id: CoroID) -> LoopYieldManagedBase:
454        return self.all_yield_objects.get(coro_id)
455    
456    def _on_register_external(self, asyncio_loop: asyncio.AbstractEventLoop, default_priority: CoroPriority):
457        task_id = -(self.task_counter.get())
458        loop_yield = LoopYieldManagedAsyncExternal(task_id,
459                                                   self.time_atom_by_priority[default_priority],
460                                                   default_priority,
461                                                   type(self),
462                                                   self._loop,
463                                                   asyncio_loop)
464        loop_yield.time_atom = self.time_atom_by_priority[loop_yield.priority]
465        self.all_yield_objects[task_id] = loop_yield
466        self.yields_by_priority[loop_yield.priority] += 1
467        self.make_live()
468        return True, loop_yield, None
469    
470    def _on_register_external_asyncio_task(self, asyncio_loop: asyncio.AbstractEventLoop, task: asyncio.Task, default_priority: CoroPriority):
471        asyncio_task_id = id(task)
472        if asyncio_task_id not in self.asyncio_task_ids:
473            self.asyncio_task_ids[asyncio_task_id] = -(self.task_counter.get())
474            
475        task_id = self.asyncio_task_ids[asyncio_task_id]
476        if task_id in self.all_yield_objects:
477            loop_yield = self.all_yield_objects[task_id]
478        else:
479            loop_yield = LoopYieldManagedAsyncExternal(task_id,
480                                                    self.time_atom_by_priority[default_priority],
481                                                    default_priority,
482                                                    type(self),
483                                                    self._loop,
484                                                    asyncio_loop)
485            def on_done_asyncio_coro(future):
486                self._on_del_external(loop_yield)
487                task.remove_done_callback(loop_yield.on_done_asyncio_coro)
488            
489            loop_yield.asyncio_task = task
490            loop_yield.on_done_asyncio_coro = on_done_asyncio_coro
491            task.add_done_callback(loop_yield.on_done_asyncio_coro)
492            loop_yield.time_atom = self.time_atom_by_priority[loop_yield.priority]
493            self.all_yield_objects[task_id] = loop_yield
494            self.yields_by_priority[loop_yield.priority] += 1
495            self.make_live()
496        
497        return True, loop_yield, None
498    
499    def _on_change_priority_external(self, loop_yield: LoopYieldManagedAsyncExternal, new_priority: CoroPriority, old_priority: CoroPriority):
500        self.yields_by_priority[old_priority] -= 1
501        self.yields_by_priority[new_priority] += 1
502        loop_yield.time_atom = self.time_atom_by_priority[new_priority]
503        self.make_live()
504        return True, None, None
505    
506    def _on_del_external(self, loop_yield: LoopYieldManagedAsyncExternal):
507        task_id = loop_yield.task_id
508        if task_id in self.all_yield_objects:
509            priority = self.all_yield_objects[task_id].priority
510            del self.all_yield_objects[task_id]
511            self.yields_by_priority[priority] -= 1
512        self.make_live()
513        return True, None, None
514    
515    def _request_coro_kill(self, coro_id: CoroID) -> ServiceProcessingResponse:
516        self.coroutines_requested_to_be_deleted.add(coro_id)
517        return True, None, None
518    
519    def _kill_coro(self, coro_id: CoroID) -> ServiceProcessingResponse:
520        waiter_coro_id = self.current_caller_coro_info
521        if coro_id not in self.coroutines_requested_to_be_deleted_by_waiters:
522            self.coroutines_requested_to_be_deleted_by_waiters[coro_id] = set()
523        
524        self.coroutines_requested_to_be_deleted_by_waiters[coro_id].add(waiter_coro_id)
525        return False, None, None
526
527    def _on_coro_del_handler_global(self, coro: CoroWrapperBase) -> bool:
528        coro_id = coro.coro_id
529        if coro_id in self.all_yield_objects:
530            priority = self.all_yield_objects[coro_id].priority
531            del self.all_yield_objects[coro_id]
532            self.yields_by_priority[priority] -= 1
533            self.make_live()
534        return False
535
536    def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool:
537        coro_id = coro.coro_id
538        priority = self.all_yield_objects[coro_id].priority
539        del self.all_yield_objects[coro_id]
540        self.yields_by_priority[priority] -= 1
541        self.make_live()
542        return False

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

LoopYieldPriorityScheduler( loop: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
248    def __init__(self, loop: CoroSchedulerType):
249        super(LoopYieldPriorityScheduler, self).__init__(loop)
250
251        # loop.add_global_on_coro_del_handler(self._on_coro_del_handler_global)  # Todo: switch to local coro del handler
252
253        self._request_workers = {
254            0: self._on_register,
255            1: self._on_setup,
256            2: self._on_change_priority,
257            3: self._on_get,
258            4: self._on_register_external,
259            5: self._on_register_external_asyncio_task,
260            6: self._on_change_priority_external,
261            7: self._on_del_external,
262            8: self._request_coro_kill,
263            9: self._kill_coro,
264        }
265
266        self.sigma = {
267            0: 0.6827,
268            1: 0.9545 - 0.6827,
269            2: 1.0 - 0.9545,
270        }
271
272        self.max_delay = 0.01
273        self.max_delays = {
274            0: 0.0,
275            1: 0.0,
276            2: 0.0,
277        }
278        self.compute_delays()
279
280        self.all_yield_objects = dict()  # type: Dict[CoroID, LoopYieldManagedBase]
281        self.task_counter = Counter()
282        while self.task_counter.get() <= 0:
283            pass
284        
285        self.yields_num: int = 0
286        self.coroutines_requested_to_be_deleted: Set[CoroID] = set()
287        self.coroutines_requested_to_be_deleted_by_waiters: Dict[CoroID, Set[CoroID]] = dict()
288        self.finished_waiters_for_coro_kill: Set[CoroID] = set()
289        self.asyncio_task_ids: Dict[Hashable, int] = dict()
290
291        self.yields_by_priority: Dict[CoroPriority, int] = {
292            CoroPriority.high:   0,
293            CoroPriority.normal: 0,
294            CoroPriority.low:    0,
295        }
296
297        self.time_atom_by_priority = {
298            CoroPriority.high:   ValueExistence(True, self.max_delays[0]),
299            CoroPriority.normal: ValueExistence(True, self.max_delays[1]),
300            CoroPriority.low:    ValueExistence(True, self.max_delays[2]),
301        }
sigma
max_delay
max_delays
all_yield_objects
task_counter
yields_num: int
coroutines_requested_to_be_deleted: Set[int]
coroutines_requested_to_be_deleted_by_waiters: Dict[int, Set[int]]
finished_waiters_for_coro_kill: Set[int]
asyncio_task_ids: Dict[Hashable, int]
yields_by_priority: Dict[CoroPriority, int]
time_atom_by_priority
def single_task_registration_or_immediate_processing( self, request: typing.Union[LoopYieldPrioritySchedulerRequest, NoneType] = None) -> Tuple[bool, Any, NoneType]:
327    def single_task_registration_or_immediate_processing(self, request: Optional[LoopYieldPrioritySchedulerRequest]=None
328                                                         ) -> Tuple[bool, Any, None]:
329        self.yields_num += 1
330        coro_should_be_killed = False
331        coro_id = self.current_caller_coro_info.coro_id
332        if coro_id in self.coroutines_requested_to_be_deleted:
333            coro_should_be_killed = True
334        
335        if coro_id in self.coroutines_requested_to_be_deleted_by_waiters:
336            self.finished_waiters_for_coro_kill |= self.coroutines_requested_to_be_deleted_by_waiters[coro_id]
337            self.make_live()
338            coro_should_be_killed = True
339        
340        if request is not None:
341            if coro_should_be_killed and (self._on_del_external != self._request_workers[request.request_type]):
342                return True, None, ThisCoroWasRequestedToBeKilled
343                
344            return self.resolve_request(request)
345
346        if coro_should_be_killed:
347            return True, None, ThisCoroWasRequestedToBeKilled
348        
349        return True, None, None
def full_processing_iteration(self):
351    def full_processing_iteration(self):
352        for coro_id in self.finished_waiters_for_coro_kill:
353            self.register_response(coro_id, None)
354        
355        self.compute_time_atoms()
def compute_time_atoms(self):
357    def compute_time_atoms(self):
358        if self.all_yield_objects:
359            top_sigma = 0
360
361            if self.yields_by_priority[CoroPriority.high]:
362                top_sigma += 1
363
364            if self.yields_by_priority[CoroPriority.normal]:
365                top_sigma += 1
366
367            if self.yields_by_priority[CoroPriority.low]:
368                top_sigma += 1
369
370            median_time_atom = 1 / len(self.all_yield_objects)  # !!! Possible division by zero! Conditional must not be removed!
371            if 1 == top_sigma:
372                sigma_0_time_atom = min(median_time_atom, self.max_delays[0])
373                self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom
374                self.time_atom_by_priority[CoroPriority.normal].result = sigma_0_time_atom
375                self.time_atom_by_priority[CoroPriority.low].result = sigma_0_time_atom
376            elif 2 == top_sigma:
377                sigma_0_time_atom = min(median_time_atom * self.sigma[0], self.max_delays[0])
378                sigma_1_time_atom = min(median_time_atom * self.sigma[1], self.max_delays[1])
379                if self.yields_by_priority[CoroPriority.high]:
380                    self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom
381                    self.time_atom_by_priority[CoroPriority.normal].result = sigma_1_time_atom
382                    self.time_atom_by_priority[CoroPriority.low].result = sigma_1_time_atom
383                else:
384                    self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom
385                    self.time_atom_by_priority[CoroPriority.normal].result = sigma_0_time_atom
386                    self.time_atom_by_priority[CoroPriority.low].result = sigma_1_time_atom
387            elif 3 == top_sigma:
388                sigma_0_time_atom = min(median_time_atom * self.sigma[0], self.max_delays[0])
389                sigma_1_time_atom = min(median_time_atom * self.sigma[1], self.max_delays[1])
390                sigma_2_time_atom = min(median_time_atom * self.sigma[2], self.max_delays[2])
391                self.time_atom_by_priority[CoroPriority.high].result = sigma_0_time_atom
392                self.time_atom_by_priority[CoroPriority.normal].result = sigma_1_time_atom
393                self.time_atom_by_priority[CoroPriority.low].result = sigma_2_time_atom
394        
395        self.make_dead()
def in_work(self) -> bool:
397    def in_work(self) -> bool:
398        result: bool = bool(self.finished_waiters_for_coro_kill) or bool(self.all_yield_objects)
399        return self.thrifty_in_work(result)

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

def compute_delays(self):
401    def compute_delays(self):
402        self.max_delays = {
403            0: self.max_delay * self.sigma[0],
404            1: self.max_delay * self.sigma[1],
405            2: self.max_delay * self.sigma[2],
406        }
def get_yield_object( self, coro_id: int) -> LoopYieldManagedBase:
453    def get_yield_object(self, coro_id: CoroID) -> LoopYieldManagedBase:
454        return self.all_yield_objects.get(coro_id)
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
StatsLevel
get_entity_stats
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
in_forground_work
thrifty_in_work
time_left_before_next_event
is_low_latency
make_live
make_dead
service_id_impl
service_id
destroy
def get_loop_yield( default_priority: CoroPriority = <CoroPriority.normal: 1>) -> Union[LoopYieldManaged, FakeLoopYieldManaged]:
548def get_loop_yield(default_priority: CoroPriority = CoroPriority.normal) -> Union[LoopYieldManaged, FakeLoopYieldManaged]:
549    loop = CoroScheduler.current_loop()
550    if loop is None:
551        return FakeLoopYieldManaged()  # running not from inside the loop
552
553    interface = loop.current_interface()
554    if interface is None:
555        return FakeLoopYieldManaged()  # running from Service
556
557    # ly = interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().get())
558    ly = interface._loop.get_service_instance(LoopYieldPriorityScheduler).get_yield_object(interface.coro_id)
559    if ly is None:
560        ly = interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().register(default_priority))
561    
562    return ly
def gly( default_priority: CoroPriority = <CoroPriority.normal: 1>) -> Union[LoopYieldManaged, FakeLoopYieldManaged]:
548def get_loop_yield(default_priority: CoroPriority = CoroPriority.normal) -> Union[LoopYieldManaged, FakeLoopYieldManaged]:
549    loop = CoroScheduler.current_loop()
550    if loop is None:
551        return FakeLoopYieldManaged()  # running not from inside the loop
552
553    interface = loop.current_interface()
554    if interface is None:
555        return FakeLoopYieldManaged()  # running from Service
556
557    # ly = interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().get())
558    ly = interface._loop.get_service_instance(LoopYieldPriorityScheduler).get_yield_object(interface.coro_id)
559    if ly is None:
560        ly = interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().register(default_priority))
561    
562    return ly
async def aget_loop_yield( default_priority: CoroPriority = <CoroPriority.normal: 1>) -> Union[LoopYieldManagedAsync, FakeLoopYieldManagedAsync]:
568async def aget_loop_yield(default_priority: CoroPriority = CoroPriority.normal) -> Union[LoopYieldManagedAsync, FakeLoopYieldManagedAsync]:
569    loop = CoroScheduler.current_loop()
570    if loop is None:
571        return FakeLoopYieldManagedAsync()  # running not from inside the loop
572
573    interface = loop.current_interface()
574    if interface is None:
575        return FakeLoopYieldManagedAsync()  # running from Service
576
577    # ly = await interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().get())
578    ly = interface._loop.get_service_instance(LoopYieldPriorityScheduler).get_yield_object(interface.coro_id)
579    if ly is None:
580        ly = await interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().register(default_priority))
581    
582    return ly
async def agly( default_priority: CoroPriority = <CoroPriority.normal: 1>) -> Union[LoopYieldManagedAsync, FakeLoopYieldManagedAsync]:
568async def aget_loop_yield(default_priority: CoroPriority = CoroPriority.normal) -> Union[LoopYieldManagedAsync, FakeLoopYieldManagedAsync]:
569    loop = CoroScheduler.current_loop()
570    if loop is None:
571        return FakeLoopYieldManagedAsync()  # running not from inside the loop
572
573    interface = loop.current_interface()
574    if interface is None:
575        return FakeLoopYieldManagedAsync()  # running from Service
576
577    # ly = await interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().get())
578    ly = interface._loop.get_service_instance(LoopYieldPriorityScheduler).get_yield_object(interface.coro_id)
579    if ly is None:
580        ly = await interface(LoopYieldPriorityScheduler, LoopYieldPrioritySchedulerRequest().register(default_priority))
581    
582    return ly
class LoopYieldManagedAsyncExternal(LoopYieldManagedBase):
203class LoopYieldManagedAsyncExternal(LoopYieldManagedBase):
204    def __init__(self, task_id: int, time_atom: ValueExistence,
205                 default_priority: CoroPriority, service: Type[Service], coro_scheduler: CoroSchedulerType, asyncio_loop: asyncio.AbstractEventLoop):
206        super(LoopYieldManagedAsyncExternal, self).__init__(None, time_atom, default_priority, service)
207        self.task_id = task_id
208        self.coro_scheduler = coro_scheduler
209        self.asyncio_loop = asyncio_loop
210        self.asyncio_task: asyncio.Task = None
211        self.on_done_asyncio_coro: Callable = None
212
213    async def __call__(self, priority: Optional[CoroPriority] = None):
214        if priority is None:
215            priority = self.default_priority
216        if priority != self.priority:
217            await await_task_fast(self.asyncio_loop, self.coro_scheduler, self.service,
218                                  LoopYieldPrioritySchedulerRequest().change_priority_external(self, priority, self.priority))
219            self.priority = priority
220            try:
221                self.tracer = Tracer(self.time_atom.result)
222            except TimeLimitIsTooSmall as ex:
223                self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME)
224        if self.tracer is None:
225            try:
226                self.tracer = Tracer(self.time_atom.result)
227            except TimeLimitIsTooSmall as ex:
228                self.tracer = Tracer(ex.min_time if ex.min_time is not None else MIN_TIME)
229        if not self.tracer.iter():
230            self.tracer = None
231            await await_task_fast(self.asyncio_loop, self.coro_scheduler, self.service)
LoopYieldManagedAsyncExternal( task_id: int, time_atom: cengal.code_flow_control.smart_values.versions.v_1.smart_values.ValueExistence, default_priority: CoroPriority, service: typing.Type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], coro_scheduler: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable], asyncio_loop: asyncio.events.AbstractEventLoop)
204    def __init__(self, task_id: int, time_atom: ValueExistence,
205                 default_priority: CoroPriority, service: Type[Service], coro_scheduler: CoroSchedulerType, asyncio_loop: asyncio.AbstractEventLoop):
206        super(LoopYieldManagedAsyncExternal, self).__init__(None, time_atom, default_priority, service)
207        self.task_id = task_id
208        self.coro_scheduler = coro_scheduler
209        self.asyncio_loop = asyncio_loop
210        self.asyncio_task: asyncio.Task = None
211        self.on_done_asyncio_coro: Callable = None
task_id
coro_scheduler
asyncio_loop
asyncio_task: _asyncio.Task
on_done_asyncio_coro: Callable
class FakeLoopYieldManagedAsync:
238class FakeLoopYieldManagedAsync:
239    async def __call__(self, priority: Optional[CoroPriority] = None):
240        pass
@asynccontextmanager
@async_generator
def external_aget_loop_yield( default_priority: CoroPriority = <CoroPriority.normal: 1>, coro_scheduler: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType] = None, asyncio_loop: typing.Union[asyncio.events.AbstractEventLoop, NoneType] = None):
588@asynccontextmanager
589@async_generator
590async def external_aget_loop_yield(default_priority: CoroPriority = CoroPriority.normal, coro_scheduler: Optional[CoroSchedulerType]=None, asyncio_loop: Optional[asyncio.AbstractEventLoop]=None):
591    if coro_scheduler is None:
592        coro_scheduler = CoroScheduler.current_loop()
593    
594    if coro_scheduler is None:
595        await yield_(FakeLoopYieldManagedAsync())  # can not determine coro scheduler loop
596    else:
597        if asyncio_loop is None:
598            asyncio_loop = asyncio.get_event_loop()
599        
600        if (3, 7) <= sys.version_info:
601            ly: LoopYieldManagedAsyncExternal = await await_task_fast(
602                asyncio_loop, coro_scheduler, LoopYieldPriorityScheduler,
603                LoopYieldPrioritySchedulerRequest().register_external_asyncio_task(asyncio_loop, asyncio.current_task(loop=asyncio_loop), default_priority))
604        else:
605            ly: LoopYieldManagedAsyncExternal = await await_task_fast(
606                asyncio_loop, coro_scheduler, LoopYieldPriorityScheduler,
607                LoopYieldPrioritySchedulerRequest().register_external(asyncio_loop, default_priority))
608
609        try:
610            await yield_(ly)
611        finally:
612            if (3, 7) > sys.version_info:
613                await await_task_fast(asyncio_loop, coro_scheduler, LoopYieldPriorityScheduler,
614                                    LoopYieldPrioritySchedulerRequest().del_external(ly))
@asynccontextmanager
@async_generator
def eagly( default_priority: CoroPriority = <CoroPriority.normal: 1>, coro_scheduler: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable, NoneType] = None, asyncio_loop: typing.Union[asyncio.events.AbstractEventLoop, NoneType] = None):
588@asynccontextmanager
589@async_generator
590async def external_aget_loop_yield(default_priority: CoroPriority = CoroPriority.normal, coro_scheduler: Optional[CoroSchedulerType]=None, asyncio_loop: Optional[asyncio.AbstractEventLoop]=None):
591    if coro_scheduler is None:
592        coro_scheduler = CoroScheduler.current_loop()
593    
594    if coro_scheduler is None:
595        await yield_(FakeLoopYieldManagedAsync())  # can not determine coro scheduler loop
596    else:
597        if asyncio_loop is None:
598            asyncio_loop = asyncio.get_event_loop()
599        
600        if (3, 7) <= sys.version_info:
601            ly: LoopYieldManagedAsyncExternal = await await_task_fast(
602                asyncio_loop, coro_scheduler, LoopYieldPriorityScheduler,
603                LoopYieldPrioritySchedulerRequest().register_external_asyncio_task(asyncio_loop, asyncio.current_task(loop=asyncio_loop), default_priority))
604        else:
605            ly: LoopYieldManagedAsyncExternal = await await_task_fast(
606                asyncio_loop, coro_scheduler, LoopYieldPriorityScheduler,
607                LoopYieldPrioritySchedulerRequest().register_external(asyncio_loop, default_priority))
608
609        try:
610            await yield_(ly)
611        finally:
612            if (3, 7) > sys.version_info:
613                await await_task_fast(asyncio_loop, coro_scheduler, LoopYieldPriorityScheduler,
614                                    LoopYieldPrioritySchedulerRequest().del_external(ly))