cengal.parallel_execution.coroutines.coro_standard_services.sleep.versions.v_0.sleep

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

 1#!/usr/bin/env python
 2# coding=utf-8
 3
 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
 5# 
 6# Licensed under the Apache License, Version 2.0 (the "License");
 7# you may not use this file except in compliance with the License.
 8# You may obtain a copy of the License at
 9# 
10#     http://www.apache.org/licenses/LICENSE-2.0
11# 
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18
19"""
20Module Docstring
21Docstrings: http://www.python.org/dev/peps/pep-0257/
22"""
23
24
25__author__ = "ButenkoMS <gtalk@butenkoms.space>"
26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
28__license__ = "Apache License, Version 2.0"
29__version__ = "4.4.1"
30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
31__email__ = "gtalk@butenkoms.space"
32# __status__ = "Prototype"
33__status__ = "Development"
34# __status__ = "Production"
35
36
37__all__ = ['Sleep']
38
39from cengal.parallel_execution.coroutines.coro_scheduler import *
40from cengal.time_management.cpu_clock_cycles import perf_counter
41from cengal.time_management.timer import Timer
42from functools import partial
43from typing import Optional, Tuple, Union
44
45
46class Sleep(TypedService[float]):
47    def __init__(self, loop: CoroSchedulerType):
48        super(Sleep, self).__init__(loop)
49        self.timer = Timer()
50        self.pending_tasks_number = 0
51        self.pending_foreground_tasks_number = 0
52
53    def single_task_registration_or_immediate_processing(
54            self, delay: float) -> Tuple[bool, None, None]:
55        def timer_handler_func(coro_id: CoroID, foreground: bool, start_time: float):
56            current_time = perf_counter()
57            if start_time > current_time:
58                start_time = current_time
59            real_delay = current_time - start_time
60            self.task_triggered(foreground)
61            self.register_response(coro_id, real_delay, None)
62
63        foreground: bool = not self.current_caller_coro_info.coro.is_background_coro
64        handler = partial(timer_handler_func, self.current_caller_coro_info.coro_id, foreground, perf_counter())
65        self.task_added(foreground)
66        self.timer.register(handler, delay)
67        self.make_live()
68        return False, None, None
69
70    def full_processing_iteration(self):
71        self.timer()
72        if 0 == self.pending_tasks_number:
73            self.make_dead()
74
75    def task_added(self, foreground: bool):
76        self.pending_tasks_number += 1
77        self.pending_foreground_tasks_number += 1 if foreground else 0
78
79    def task_triggered(self, foreground: bool):
80        self.pending_tasks_number -= 1
81        self.pending_foreground_tasks_number -= 1 if foreground else 0
82
83    def in_work(self) -> bool:
84        return self.thrifty_in_work(self.pending_tasks_number != 0)
85    
86    def in_forground_work(self) -> bool:
87        return self.pending_foreground_tasks_number
88    
89    def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]:
90        return self.pending_tasks_number != 0, self.timer.nearest_event()
class Sleep(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.TypedService[float]):
47class Sleep(TypedService[float]):
48    def __init__(self, loop: CoroSchedulerType):
49        super(Sleep, self).__init__(loop)
50        self.timer = Timer()
51        self.pending_tasks_number = 0
52        self.pending_foreground_tasks_number = 0
53
54    def single_task_registration_or_immediate_processing(
55            self, delay: float) -> Tuple[bool, None, None]:
56        def timer_handler_func(coro_id: CoroID, foreground: bool, start_time: float):
57            current_time = perf_counter()
58            if start_time > current_time:
59                start_time = current_time
60            real_delay = current_time - start_time
61            self.task_triggered(foreground)
62            self.register_response(coro_id, real_delay, None)
63
64        foreground: bool = not self.current_caller_coro_info.coro.is_background_coro
65        handler = partial(timer_handler_func, self.current_caller_coro_info.coro_id, foreground, perf_counter())
66        self.task_added(foreground)
67        self.timer.register(handler, delay)
68        self.make_live()
69        return False, None, None
70
71    def full_processing_iteration(self):
72        self.timer()
73        if 0 == self.pending_tasks_number:
74            self.make_dead()
75
76    def task_added(self, foreground: bool):
77        self.pending_tasks_number += 1
78        self.pending_foreground_tasks_number += 1 if foreground else 0
79
80    def task_triggered(self, foreground: bool):
81        self.pending_tasks_number -= 1
82        self.pending_foreground_tasks_number -= 1 if foreground else 0
83
84    def in_work(self) -> bool:
85        return self.thrifty_in_work(self.pending_tasks_number != 0)
86    
87    def in_forground_work(self) -> bool:
88        return self.pending_foreground_tasks_number
89    
90    def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]:
91        return self.pending_tasks_number != 0, self.timer.nearest_event()

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

Sleep( loop: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
48    def __init__(self, loop: CoroSchedulerType):
49        super(Sleep, self).__init__(loop)
50        self.timer = Timer()
51        self.pending_tasks_number = 0
52        self.pending_foreground_tasks_number = 0
timer
pending_tasks_number
pending_foreground_tasks_number
def single_task_registration_or_immediate_processing(self, delay: float) -> Tuple[bool, NoneType, NoneType]:
54    def single_task_registration_or_immediate_processing(
55            self, delay: float) -> Tuple[bool, None, None]:
56        def timer_handler_func(coro_id: CoroID, foreground: bool, start_time: float):
57            current_time = perf_counter()
58            if start_time > current_time:
59                start_time = current_time
60            real_delay = current_time - start_time
61            self.task_triggered(foreground)
62            self.register_response(coro_id, real_delay, None)
63
64        foreground: bool = not self.current_caller_coro_info.coro.is_background_coro
65        handler = partial(timer_handler_func, self.current_caller_coro_info.coro_id, foreground, perf_counter())
66        self.task_added(foreground)
67        self.timer.register(handler, delay)
68        self.make_live()
69        return False, None, None
def full_processing_iteration(self):
71    def full_processing_iteration(self):
72        self.timer()
73        if 0 == self.pending_tasks_number:
74            self.make_dead()
def task_added(self, foreground: bool):
76    def task_added(self, foreground: bool):
77        self.pending_tasks_number += 1
78        self.pending_foreground_tasks_number += 1 if foreground else 0
def task_triggered(self, foreground: bool):
80    def task_triggered(self, foreground: bool):
81        self.pending_tasks_number -= 1
82        self.pending_foreground_tasks_number -= 1 if foreground else 0
def in_work(self) -> bool:
84    def in_work(self) -> bool:
85        return self.thrifty_in_work(self.pending_tasks_number != 0)

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

def in_forground_work(self) -> bool:
87    def in_forground_work(self) -> bool:
88        return self.pending_foreground_tasks_number
def time_left_before_next_event(self) -> Tuple[bool, Union[int, float, NoneType]]:
90    def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]:
91        return self.pending_tasks_number != 0, self.timer.nearest_event()
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
thrifty_in_work
is_low_latency
make_live
make_dead
service_id_impl
service_id
destroy