cengal.parallel_execution.coroutines.coro_standard_services.sleep.versions.v_0.sleep
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19""" 20Module Docstring 21Docstrings: http://www.python.org/dev/peps/pep-0257/ 22""" 23 24 25__author__ = "ButenkoMS <gtalk@butenkoms.space>" 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 28__license__ = "Apache License, Version 2.0" 29__version__ = "4.4.1" 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 31__email__ = "gtalk@butenkoms.space" 32# __status__ = "Prototype" 33__status__ = "Development" 34# __status__ = "Production" 35 36 37__all__ = ['Sleep'] 38 39from cengal.parallel_execution.coroutines.coro_scheduler import * 40from cengal.time_management.cpu_clock_cycles import perf_counter 41from cengal.time_management.timer import Timer 42from functools import partial 43from typing import Optional, Tuple, Union 44 45 46class Sleep(TypedService[float]): 47 def __init__(self, loop: CoroSchedulerType): 48 super(Sleep, self).__init__(loop) 49 self.timer = Timer() 50 self.pending_tasks_number = 0 51 self.pending_foreground_tasks_number = 0 52 53 def single_task_registration_or_immediate_processing( 54 self, delay: float) -> Tuple[bool, None, None]: 55 def timer_handler_func(coro_id: CoroID, foreground: bool, start_time: float): 56 current_time = perf_counter() 57 if start_time > current_time: 58 start_time = current_time 59 real_delay = current_time - start_time 60 self.task_triggered(foreground) 61 self.register_response(coro_id, real_delay, None) 62 63 foreground: bool = not self.current_caller_coro_info.coro.is_background_coro 64 handler = partial(timer_handler_func, self.current_caller_coro_info.coro_id, foreground, perf_counter()) 65 self.task_added(foreground) 66 self.timer.register(handler, delay) 67 self.make_live() 68 return False, None, None 69 70 def full_processing_iteration(self): 71 self.timer() 72 if 0 == self.pending_tasks_number: 73 self.make_dead() 74 75 def task_added(self, foreground: bool): 76 self.pending_tasks_number += 1 77 self.pending_foreground_tasks_number += 1 if foreground else 0 78 79 def task_triggered(self, foreground: bool): 80 self.pending_tasks_number -= 1 81 self.pending_foreground_tasks_number -= 1 if foreground else 0 82 83 def in_work(self) -> bool: 84 return self.thrifty_in_work(self.pending_tasks_number != 0) 85 86 def in_forground_work(self) -> bool: 87 return self.pending_foreground_tasks_number 88 89 def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]: 90 return self.pending_tasks_number != 0, self.timer.nearest_event()
47class Sleep(TypedService[float]): 48 def __init__(self, loop: CoroSchedulerType): 49 super(Sleep, self).__init__(loop) 50 self.timer = Timer() 51 self.pending_tasks_number = 0 52 self.pending_foreground_tasks_number = 0 53 54 def single_task_registration_or_immediate_processing( 55 self, delay: float) -> Tuple[bool, None, None]: 56 def timer_handler_func(coro_id: CoroID, foreground: bool, start_time: float): 57 current_time = perf_counter() 58 if start_time > current_time: 59 start_time = current_time 60 real_delay = current_time - start_time 61 self.task_triggered(foreground) 62 self.register_response(coro_id, real_delay, None) 63 64 foreground: bool = not self.current_caller_coro_info.coro.is_background_coro 65 handler = partial(timer_handler_func, self.current_caller_coro_info.coro_id, foreground, perf_counter()) 66 self.task_added(foreground) 67 self.timer.register(handler, delay) 68 self.make_live() 69 return False, None, None 70 71 def full_processing_iteration(self): 72 self.timer() 73 if 0 == self.pending_tasks_number: 74 self.make_dead() 75 76 def task_added(self, foreground: bool): 77 self.pending_tasks_number += 1 78 self.pending_foreground_tasks_number += 1 if foreground else 0 79 80 def task_triggered(self, foreground: bool): 81 self.pending_tasks_number -= 1 82 self.pending_foreground_tasks_number -= 1 if foreground else 0 83 84 def in_work(self) -> bool: 85 return self.thrifty_in_work(self.pending_tasks_number != 0) 86 87 def in_forground_work(self) -> bool: 88 return self.pending_foreground_tasks_number 89 90 def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]: 91 return self.pending_tasks_number != 0, self.timer.nearest_event()
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
54 def single_task_registration_or_immediate_processing( 55 self, delay: float) -> Tuple[bool, None, None]: 56 def timer_handler_func(coro_id: CoroID, foreground: bool, start_time: float): 57 current_time = perf_counter() 58 if start_time > current_time: 59 start_time = current_time 60 real_delay = current_time - start_time 61 self.task_triggered(foreground) 62 self.register_response(coro_id, real_delay, None) 63 64 foreground: bool = not self.current_caller_coro_info.coro.is_background_coro 65 handler = partial(timer_handler_func, self.current_caller_coro_info.coro_id, foreground, perf_counter()) 66 self.task_added(foreground) 67 self.timer.register(handler, delay) 68 self.make_live() 69 return False, None, None
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- thrifty_in_work
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
- destroy