cengal.parallel_execution.coroutines.coro_standard_services.watchdog.versions.v_0.watchdog
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18""" 19Module Docstring 20Docstrings: http://www.python.org/dev/peps/pep-0257/ 21""" 22 23__author__ = "ButenkoMS <gtalk@butenkoms.space>" 24__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 25__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 26__license__ = "Apache License, Version 2.0" 27__version__ = "4.4.1" 28__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 29__email__ = "gtalk@butenkoms.space" 30# __status__ = "Prototype" 31__status__ = "Development" 32# __status__ = "Production" 33 34 35__all__ = ['WatchdogTimeoutError', 'Watchdog'] 36 37 38from cengal.parallel_execution.coroutines.coro_scheduler import * 39import signal 40from typing import Tuple, Optional 41from threading import current_thread, main_thread, Thread 42from cengal.time_management.sleep_tools import sleep 43from cengal.math.numbers import RationalNumber 44from signal import SIG_IGN, SIGUSR1, raise_signal, signal 45from cengal.system import OS_API_TYPE 46 47 48class WatchdogTimeoutError(Exception): 49 pass 50 51 52class HungedCoroInfo: 53 def __init__(self, interface: Interface, coro_start_time: RationalNumber): 54 self.interface: Interface = interface 55 self.coro_start_time: RationalNumber = coro_start_time 56 57 58class Watchdog(TypedService[None]): 59 def __init__(self, loop: CoroSchedulerType): 60 super().__init__(loop) 61 self.handler_set: bool = False 62 self.previous_handler = None 63 self.keyboard_interrupt_emited: bool = False 64 self.period: RationalNumber = 5 65 if 'nt' == OS_API_TYPE: 66 self.raised_signal = SIG_IGN 67 else: 68 self.raised_signal = SIGUSR1 69 70 self.hunged_coro: Optional[HungedCoroInfo] = None 71 self.watchdog_thread: Optional[Thread] = None 72 self.watchdog_thread_name: str = '_CengalCoroutinesWatchdogDeamon_' 73 self.watchdog_thread_allowed: bool = True 74 self.watchdog_thread_in_work: bool = False 75 76 def watchdog_thread_func(self): 77 while self.watchdog_thread_allowed and (not self._loop._destroyed): 78 sleep(self.period) 79 self.watchdog_thread_in_work = True 80 try: 81 self.check_idle_coro() 82 finally: 83 self.watchdog_thread_in_work = False 84 85 def check_idle_coro(self): 86 loop: CoroSchedulerType = self._loop 87 if get_current_coro_scheduler() is not loop: 88 return 89 90 curr_interface: Optional[Interface] = current_interface() 91 if curr_interface is None: 92 return 93 94 if curr_interface.ignored_by_watchdog: 95 return 96 97 if loop.current_coro_start_time is None: 98 return 99 100 if self.hunged_coro is not None: 101 return 102 103 coro_execution_piece_delta_time = loop.get_coro_start_time() - loop.current_coro_start_time 104 if coro_execution_piece_delta_time > self.period: 105 raise_signal(self.raised_signal) 106 107 def is_in_main_thread(self): 108 return current_thread() is main_thread() 109 110 def single_task_registration_or_immediate_processing( 111 self, period: RationalNumber) -> Tuple[bool, None, None]: 112 result = False 113 if self.is_in_main_thread(): 114 self._loop.set_coro_time_measurement(True) 115 if not self.handler_set: 116 self.previous_handler = signal.signal(self.raised_signal, self.interrupt_handler) 117 self.handler_set = True 118 119 if self.watchdog_thread is None: 120 self.watchdog_thread = Thread(target=self.watchdog_thread_func, name=self.watchdog_thread_name, daemon=True) 121 self.watchdog_thread.start() 122 123 self.period = period 124 result = True 125 126 return result, None, None 127 128 def full_processing_iteration(self): 129 self.make_dead() 130 131 def in_work(self) -> bool: 132 result: bool = self.keyboard_interrupt_emited 133 return self.thrifty_in_work(result) 134 135 def interrupt_handler(self, sig, frame): 136 if self.hunged_coro is None: 137 return 138 139 loop: CoroSchedulerType = self._loop 140 if get_current_coro_scheduler() is not loop: 141 return 142 143 if current_interface() is not self.hunged_coro.interface: 144 return 145 146 if loop.current_coro_start_time != self.hunged_coro.coro_start_time: 147 return 148 149 try: 150 raise WatchdogTimeoutError 151 finally: 152 self.hunged_coro = None 153 154 def destroy(self): 155 if self.watchdog_thread is not None: 156 self.watchdog_thread_allowed = False 157 while self.watchdog_thread_in_work: 158 sleep(0.0001, high_cpu_utilisation_mode=True) 159 160 # self.watchdog_thread.join() # we don ot need this since it is a daemon thread 161 self.watchdog_thread = None 162 163 if self.handler_set: 164 signal.signal(self.raised_signal, self.previous_handler) 165 self.handler_set = False
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
59class Watchdog(TypedService[None]): 60 def __init__(self, loop: CoroSchedulerType): 61 super().__init__(loop) 62 self.handler_set: bool = False 63 self.previous_handler = None 64 self.keyboard_interrupt_emited: bool = False 65 self.period: RationalNumber = 5 66 if 'nt' == OS_API_TYPE: 67 self.raised_signal = SIG_IGN 68 else: 69 self.raised_signal = SIGUSR1 70 71 self.hunged_coro: Optional[HungedCoroInfo] = None 72 self.watchdog_thread: Optional[Thread] = None 73 self.watchdog_thread_name: str = '_CengalCoroutinesWatchdogDeamon_' 74 self.watchdog_thread_allowed: bool = True 75 self.watchdog_thread_in_work: bool = False 76 77 def watchdog_thread_func(self): 78 while self.watchdog_thread_allowed and (not self._loop._destroyed): 79 sleep(self.period) 80 self.watchdog_thread_in_work = True 81 try: 82 self.check_idle_coro() 83 finally: 84 self.watchdog_thread_in_work = False 85 86 def check_idle_coro(self): 87 loop: CoroSchedulerType = self._loop 88 if get_current_coro_scheduler() is not loop: 89 return 90 91 curr_interface: Optional[Interface] = current_interface() 92 if curr_interface is None: 93 return 94 95 if curr_interface.ignored_by_watchdog: 96 return 97 98 if loop.current_coro_start_time is None: 99 return 100 101 if self.hunged_coro is not None: 102 return 103 104 coro_execution_piece_delta_time = loop.get_coro_start_time() - loop.current_coro_start_time 105 if coro_execution_piece_delta_time > self.period: 106 raise_signal(self.raised_signal) 107 108 def is_in_main_thread(self): 109 return current_thread() is main_thread() 110 111 def single_task_registration_or_immediate_processing( 112 self, period: RationalNumber) -> Tuple[bool, None, None]: 113 result = False 114 if self.is_in_main_thread(): 115 self._loop.set_coro_time_measurement(True) 116 if not self.handler_set: 117 self.previous_handler = signal.signal(self.raised_signal, self.interrupt_handler) 118 self.handler_set = True 119 120 if self.watchdog_thread is None: 121 self.watchdog_thread = Thread(target=self.watchdog_thread_func, name=self.watchdog_thread_name, daemon=True) 122 self.watchdog_thread.start() 123 124 self.period = period 125 result = True 126 127 return result, None, None 128 129 def full_processing_iteration(self): 130 self.make_dead() 131 132 def in_work(self) -> bool: 133 result: bool = self.keyboard_interrupt_emited 134 return self.thrifty_in_work(result) 135 136 def interrupt_handler(self, sig, frame): 137 if self.hunged_coro is None: 138 return 139 140 loop: CoroSchedulerType = self._loop 141 if get_current_coro_scheduler() is not loop: 142 return 143 144 if current_interface() is not self.hunged_coro.interface: 145 return 146 147 if loop.current_coro_start_time != self.hunged_coro.coro_start_time: 148 return 149 150 try: 151 raise WatchdogTimeoutError 152 finally: 153 self.hunged_coro = None 154 155 def destroy(self): 156 if self.watchdog_thread is not None: 157 self.watchdog_thread_allowed = False 158 while self.watchdog_thread_in_work: 159 sleep(0.0001, high_cpu_utilisation_mode=True) 160 161 # self.watchdog_thread.join() # we don ot need this since it is a daemon thread 162 self.watchdog_thread = None 163 164 if self.handler_set: 165 signal.signal(self.raised_signal, self.previous_handler) 166 self.handler_set = False
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
60 def __init__(self, loop: CoroSchedulerType): 61 super().__init__(loop) 62 self.handler_set: bool = False 63 self.previous_handler = None 64 self.keyboard_interrupt_emited: bool = False 65 self.period: RationalNumber = 5 66 if 'nt' == OS_API_TYPE: 67 self.raised_signal = SIG_IGN 68 else: 69 self.raised_signal = SIGUSR1 70 71 self.hunged_coro: Optional[HungedCoroInfo] = None 72 self.watchdog_thread: Optional[Thread] = None 73 self.watchdog_thread_name: str = '_CengalCoroutinesWatchdogDeamon_' 74 self.watchdog_thread_allowed: bool = True 75 self.watchdog_thread_in_work: bool = False
86 def check_idle_coro(self): 87 loop: CoroSchedulerType = self._loop 88 if get_current_coro_scheduler() is not loop: 89 return 90 91 curr_interface: Optional[Interface] = current_interface() 92 if curr_interface is None: 93 return 94 95 if curr_interface.ignored_by_watchdog: 96 return 97 98 if loop.current_coro_start_time is None: 99 return 100 101 if self.hunged_coro is not None: 102 return 103 104 coro_execution_piece_delta_time = loop.get_coro_start_time() - loop.current_coro_start_time 105 if coro_execution_piece_delta_time > self.period: 106 raise_signal(self.raised_signal)
111 def single_task_registration_or_immediate_processing( 112 self, period: RationalNumber) -> Tuple[bool, None, None]: 113 result = False 114 if self.is_in_main_thread(): 115 self._loop.set_coro_time_measurement(True) 116 if not self.handler_set: 117 self.previous_handler = signal.signal(self.raised_signal, self.interrupt_handler) 118 self.handler_set = True 119 120 if self.watchdog_thread is None: 121 self.watchdog_thread = Thread(target=self.watchdog_thread_func, name=self.watchdog_thread_name, daemon=True) 122 self.watchdog_thread.start() 123 124 self.period = period 125 result = True 126 127 return result, None, None
132 def in_work(self) -> bool: 133 result: bool = self.keyboard_interrupt_emited 134 return self.thrifty_in_work(result)
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
136 def interrupt_handler(self, sig, frame): 137 if self.hunged_coro is None: 138 return 139 140 loop: CoroSchedulerType = self._loop 141 if get_current_coro_scheduler() is not loop: 142 return 143 144 if current_interface() is not self.hunged_coro.interface: 145 return 146 147 if loop.current_coro_start_time != self.hunged_coro.coro_start_time: 148 return 149 150 try: 151 raise WatchdogTimeoutError 152 finally: 153 self.hunged_coro = None
155 def destroy(self): 156 if self.watchdog_thread is not None: 157 self.watchdog_thread_allowed = False 158 while self.watchdog_thread_in_work: 159 sleep(0.0001, high_cpu_utilisation_mode=True) 160 161 # self.watchdog_thread.join() # we don ot need this since it is a daemon thread 162 self.watchdog_thread = None 163 164 if self.handler_set: 165 signal.signal(self.raised_signal, self.previous_handler) 166 self.handler_set = False
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- time_left_before_next_event
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id