cengal.parallel_execution.coroutines.coro_standard_services.watchdog.versions.v_0.watchdog

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18"""
 19Module Docstring
 20Docstrings: http://www.python.org/dev/peps/pep-0257/
 21"""
 22
 23__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 24__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 25__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 26__license__ = "Apache License, Version 2.0"
 27__version__ = "4.4.1"
 28__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 29__email__ = "gtalk@butenkoms.space"
 30# __status__ = "Prototype"
 31__status__ = "Development"
 32# __status__ = "Production"
 33
 34
 35__all__ = ['WatchdogTimeoutError', 'Watchdog']
 36
 37
 38from cengal.parallel_execution.coroutines.coro_scheduler import *
 39import signal
 40from typing import Tuple, Optional
 41from threading import current_thread, main_thread, Thread
 42from cengal.time_management.sleep_tools import sleep
 43from cengal.math.numbers import RationalNumber
 44from signal import SIG_IGN, SIGUSR1, raise_signal, signal
 45from cengal.system import OS_API_TYPE
 46
 47
 48class WatchdogTimeoutError(Exception):
 49    pass
 50
 51
 52class HungedCoroInfo:
 53    def __init__(self, interface: Interface, coro_start_time: RationalNumber):
 54        self.interface: Interface = interface
 55        self.coro_start_time: RationalNumber = coro_start_time
 56
 57
 58class Watchdog(TypedService[None]):
 59    def __init__(self, loop: CoroSchedulerType):
 60        super().__init__(loop)
 61        self.handler_set: bool = False
 62        self.previous_handler = None
 63        self.keyboard_interrupt_emited: bool = False
 64        self.period: RationalNumber = 5
 65        if 'nt' == OS_API_TYPE:
 66            self.raised_signal = SIG_IGN
 67        else:
 68            self.raised_signal = SIGUSR1
 69        
 70        self.hunged_coro: Optional[HungedCoroInfo] = None
 71        self.watchdog_thread: Optional[Thread] = None
 72        self.watchdog_thread_name: str = '_CengalCoroutinesWatchdogDeamon_'
 73        self.watchdog_thread_allowed: bool = True
 74        self.watchdog_thread_in_work: bool = False
 75
 76    def watchdog_thread_func(self):
 77        while self.watchdog_thread_allowed and (not self._loop._destroyed):
 78            sleep(self.period)
 79            self.watchdog_thread_in_work = True
 80            try:
 81                self.check_idle_coro()
 82            finally:
 83                self.watchdog_thread_in_work = False
 84    
 85    def check_idle_coro(self):
 86        loop: CoroSchedulerType = self._loop
 87        if get_current_coro_scheduler() is not loop:
 88            return
 89        
 90        curr_interface: Optional[Interface] = current_interface()
 91        if curr_interface is None:
 92            return
 93
 94        if curr_interface.ignored_by_watchdog:
 95            return
 96
 97        if loop.current_coro_start_time is None:
 98            return
 99
100        if self.hunged_coro is not None:
101            return
102
103        coro_execution_piece_delta_time = loop.get_coro_start_time() - loop.current_coro_start_time
104        if coro_execution_piece_delta_time > self.period:
105            raise_signal(self.raised_signal)
106    
107    def is_in_main_thread(self):
108        return current_thread() is main_thread()
109
110    def single_task_registration_or_immediate_processing(
111            self, period: RationalNumber) -> Tuple[bool, None, None]:
112        result = False
113        if self.is_in_main_thread():
114            self._loop.set_coro_time_measurement(True)
115            if not self.handler_set:
116                self.previous_handler = signal.signal(self.raised_signal, self.interrupt_handler)
117                self.handler_set = True
118            
119            if self.watchdog_thread is None:
120                self.watchdog_thread = Thread(target=self.watchdog_thread_func, name=self.watchdog_thread_name, daemon=True)
121                self.watchdog_thread.start()
122
123            self.period = period
124            result = True
125        
126        return result, None, None
127
128    def full_processing_iteration(self):
129        self.make_dead()
130
131    def in_work(self) -> bool:
132        result: bool = self.keyboard_interrupt_emited
133        return self.thrifty_in_work(result)
134    
135    def interrupt_handler(self, sig, frame):
136        if self.hunged_coro is None:
137            return
138        
139        loop: CoroSchedulerType = self._loop
140        if get_current_coro_scheduler() is not loop:
141            return
142        
143        if current_interface() is not self.hunged_coro.interface:
144            return
145
146        if loop.current_coro_start_time != self.hunged_coro.coro_start_time:
147            return
148
149        try:
150            raise WatchdogTimeoutError
151        finally:
152            self.hunged_coro = None
153    
154    def destroy(self):
155        if self.watchdog_thread is not None:
156            self.watchdog_thread_allowed = False
157            while self.watchdog_thread_in_work:
158                sleep(0.0001, high_cpu_utilisation_mode=True)
159            
160            # self.watchdog_thread.join()  # we don ot need this since it is a daemon thread
161            self.watchdog_thread = None
162
163        if self.handler_set:
164            signal.signal(self.raised_signal, self.previous_handler)
165            self.handler_set = False
class WatchdogTimeoutError(builtins.Exception):
49class WatchdogTimeoutError(Exception):
50    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class Watchdog(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.TypedService[NoneType]):
 59class Watchdog(TypedService[None]):
 60    def __init__(self, loop: CoroSchedulerType):
 61        super().__init__(loop)
 62        self.handler_set: bool = False
 63        self.previous_handler = None
 64        self.keyboard_interrupt_emited: bool = False
 65        self.period: RationalNumber = 5
 66        if 'nt' == OS_API_TYPE:
 67            self.raised_signal = SIG_IGN
 68        else:
 69            self.raised_signal = SIGUSR1
 70        
 71        self.hunged_coro: Optional[HungedCoroInfo] = None
 72        self.watchdog_thread: Optional[Thread] = None
 73        self.watchdog_thread_name: str = '_CengalCoroutinesWatchdogDeamon_'
 74        self.watchdog_thread_allowed: bool = True
 75        self.watchdog_thread_in_work: bool = False
 76
 77    def watchdog_thread_func(self):
 78        while self.watchdog_thread_allowed and (not self._loop._destroyed):
 79            sleep(self.period)
 80            self.watchdog_thread_in_work = True
 81            try:
 82                self.check_idle_coro()
 83            finally:
 84                self.watchdog_thread_in_work = False
 85    
 86    def check_idle_coro(self):
 87        loop: CoroSchedulerType = self._loop
 88        if get_current_coro_scheduler() is not loop:
 89            return
 90        
 91        curr_interface: Optional[Interface] = current_interface()
 92        if curr_interface is None:
 93            return
 94
 95        if curr_interface.ignored_by_watchdog:
 96            return
 97
 98        if loop.current_coro_start_time is None:
 99            return
100
101        if self.hunged_coro is not None:
102            return
103
104        coro_execution_piece_delta_time = loop.get_coro_start_time() - loop.current_coro_start_time
105        if coro_execution_piece_delta_time > self.period:
106            raise_signal(self.raised_signal)
107    
108    def is_in_main_thread(self):
109        return current_thread() is main_thread()
110
111    def single_task_registration_or_immediate_processing(
112            self, period: RationalNumber) -> Tuple[bool, None, None]:
113        result = False
114        if self.is_in_main_thread():
115            self._loop.set_coro_time_measurement(True)
116            if not self.handler_set:
117                self.previous_handler = signal.signal(self.raised_signal, self.interrupt_handler)
118                self.handler_set = True
119            
120            if self.watchdog_thread is None:
121                self.watchdog_thread = Thread(target=self.watchdog_thread_func, name=self.watchdog_thread_name, daemon=True)
122                self.watchdog_thread.start()
123
124            self.period = period
125            result = True
126        
127        return result, None, None
128
129    def full_processing_iteration(self):
130        self.make_dead()
131
132    def in_work(self) -> bool:
133        result: bool = self.keyboard_interrupt_emited
134        return self.thrifty_in_work(result)
135    
136    def interrupt_handler(self, sig, frame):
137        if self.hunged_coro is None:
138            return
139        
140        loop: CoroSchedulerType = self._loop
141        if get_current_coro_scheduler() is not loop:
142            return
143        
144        if current_interface() is not self.hunged_coro.interface:
145            return
146
147        if loop.current_coro_start_time != self.hunged_coro.coro_start_time:
148            return
149
150        try:
151            raise WatchdogTimeoutError
152        finally:
153            self.hunged_coro = None
154    
155    def destroy(self):
156        if self.watchdog_thread is not None:
157            self.watchdog_thread_allowed = False
158            while self.watchdog_thread_in_work:
159                sleep(0.0001, high_cpu_utilisation_mode=True)
160            
161            # self.watchdog_thread.join()  # we don ot need this since it is a daemon thread
162            self.watchdog_thread = None
163
164        if self.handler_set:
165            signal.signal(self.raised_signal, self.previous_handler)
166            self.handler_set = False

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

Watchdog( loop: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
60    def __init__(self, loop: CoroSchedulerType):
61        super().__init__(loop)
62        self.handler_set: bool = False
63        self.previous_handler = None
64        self.keyboard_interrupt_emited: bool = False
65        self.period: RationalNumber = 5
66        if 'nt' == OS_API_TYPE:
67            self.raised_signal = SIG_IGN
68        else:
69            self.raised_signal = SIGUSR1
70        
71        self.hunged_coro: Optional[HungedCoroInfo] = None
72        self.watchdog_thread: Optional[Thread] = None
73        self.watchdog_thread_name: str = '_CengalCoroutinesWatchdogDeamon_'
74        self.watchdog_thread_allowed: bool = True
75        self.watchdog_thread_in_work: bool = False
handler_set: bool
previous_handler
keyboard_interrupt_emited: bool
period: Union[int, float]
hunged_coro: Union[cengal.parallel_execution.coroutines.coro_standard_services.watchdog.versions.v_0.watchdog.HungedCoroInfo, NoneType]
watchdog_thread: Union[threading.Thread, NoneType]
watchdog_thread_name: str
watchdog_thread_allowed: bool
watchdog_thread_in_work: bool
def watchdog_thread_func(self):
77    def watchdog_thread_func(self):
78        while self.watchdog_thread_allowed and (not self._loop._destroyed):
79            sleep(self.period)
80            self.watchdog_thread_in_work = True
81            try:
82                self.check_idle_coro()
83            finally:
84                self.watchdog_thread_in_work = False
def check_idle_coro(self):
 86    def check_idle_coro(self):
 87        loop: CoroSchedulerType = self._loop
 88        if get_current_coro_scheduler() is not loop:
 89            return
 90        
 91        curr_interface: Optional[Interface] = current_interface()
 92        if curr_interface is None:
 93            return
 94
 95        if curr_interface.ignored_by_watchdog:
 96            return
 97
 98        if loop.current_coro_start_time is None:
 99            return
100
101        if self.hunged_coro is not None:
102            return
103
104        coro_execution_piece_delta_time = loop.get_coro_start_time() - loop.current_coro_start_time
105        if coro_execution_piece_delta_time > self.period:
106            raise_signal(self.raised_signal)
def is_in_main_thread(self):
108    def is_in_main_thread(self):
109        return current_thread() is main_thread()
def single_task_registration_or_immediate_processing(self, period: Union[int, float]) -> Tuple[bool, NoneType, NoneType]:
111    def single_task_registration_or_immediate_processing(
112            self, period: RationalNumber) -> Tuple[bool, None, None]:
113        result = False
114        if self.is_in_main_thread():
115            self._loop.set_coro_time_measurement(True)
116            if not self.handler_set:
117                self.previous_handler = signal.signal(self.raised_signal, self.interrupt_handler)
118                self.handler_set = True
119            
120            if self.watchdog_thread is None:
121                self.watchdog_thread = Thread(target=self.watchdog_thread_func, name=self.watchdog_thread_name, daemon=True)
122                self.watchdog_thread.start()
123
124            self.period = period
125            result = True
126        
127        return result, None, None
def full_processing_iteration(self):
129    def full_processing_iteration(self):
130        self.make_dead()
def in_work(self) -> bool:
132    def in_work(self) -> bool:
133        result: bool = self.keyboard_interrupt_emited
134        return self.thrifty_in_work(result)

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

def interrupt_handler(self, sig, frame):
136    def interrupt_handler(self, sig, frame):
137        if self.hunged_coro is None:
138            return
139        
140        loop: CoroSchedulerType = self._loop
141        if get_current_coro_scheduler() is not loop:
142            return
143        
144        if current_interface() is not self.hunged_coro.interface:
145            return
146
147        if loop.current_coro_start_time != self.hunged_coro.coro_start_time:
148            return
149
150        try:
151            raise WatchdogTimeoutError
152        finally:
153            self.hunged_coro = None
def destroy(self):
155    def destroy(self):
156        if self.watchdog_thread is not None:
157            self.watchdog_thread_allowed = False
158            while self.watchdog_thread_in_work:
159                sleep(0.0001, high_cpu_utilisation_mode=True)
160            
161            # self.watchdog_thread.join()  # we don ot need this since it is a daemon thread
162            self.watchdog_thread = None
163
164        if self.handler_set:
165            signal.signal(self.raised_signal, self.previous_handler)
166            self.handler_set = False
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
in_forground_work
thrifty_in_work
time_left_before_next_event
is_low_latency
make_live
make_dead
service_id_impl
service_id