cengal.parallel_execution.coroutines.coro_tools.loop_administration.admin_tk.versions.v_0.admin_tk
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19__all__ = ['start_admin', 'cs_init', 'SchedulerPerformanceFormatter', 'scheduler_stats_aggregator_provider', 'CSStatsFormatterMultiprocess', 20 'CorosLogsProvider', 'CoroLogsAppendFormatter', 'CoroLogsAppendFormatterParts'] 21 22 23""" 24Module Docstring 25Docstrings: http://www.python.org/dev/peps/pep-0257/ 26""" 27 28__author__ = "ButenkoMS <gtalk@butenkoms.space>" 29__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 30__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 31__license__ = "Apache License, Version 2.0" 32__version__ = "4.4.1" 33__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 34__email__ = "gtalk@butenkoms.space" 35# __status__ = "Prototype" 36__status__ = "Development" 37# __status__ = "Production" 38 39 40from cengal.time_management.cpu_clock_cycles import perf_counter 41import ttkbootstrap as ttkb 42from ttkbootstrap.scrolled import ScrolledText as TtkbScrolledText 43from tkinter import simpledialog 44import tkinter as tk 45from ttkbootstrap import Style 46from pprintpp import pformat as pf 47from typing import Callable, Hashable, Optional, Set, Dict, Tuple, Union, List, Sequence, Any, cast 48 49from cengal.parallel_execution.coroutines.coro_scheduler import * 50from cengal.parallel_execution.coroutines.coro_standard_services.tkinter import * 51from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro, PutCoroRequest 52from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import RunCoro 53from cengal.parallel_execution.coroutines.coro_standard_services.sleep import Sleep 54from cengal.parallel_execution.coroutines.coro_standard_services.simple_yield import Yield 55from cengal.parallel_execution.coroutines.coro_standard_services.asyncio_loop import AsyncioLoopRequest, run_in_thread_pool, run_in_thread_pool_fast 56from cengal.parallel_execution.coroutines.coro_standard_services.fast_aggregator import * 57from cengal.parallel_execution.coroutines.coro_standard_services.log import * 58from cengal.parallel_execution.coroutines.coro_standard_services.instance import InstanceRequest 59from cengal.parallel_execution.coroutines.coro_tools.coro_flow_control import graceful_coro_destroyer, agraceful_coro_destroyer, GracefulCoroDestroy 60from cengal.parallel_execution.coroutines.coro_tools.lock import Lock 61from cengal.statistics.normal_distribution import average 62from cengal.parallel_execution.multiprocess.multiprocessing_task_runner import * 63from cengal.text_processing.text_processing import normalize_line_separators_and_tabs 64from cengal.introspection.inspect import get_exception, exception_to_printable_text 65from cengal.system import OS_TYPE 66from cengal.data_manipulation.dict_path import * 67from cengal.user_interface.gui.tkinter.components.read_only_text import * 68from cengal.user_interface.gui.tkinter.components.tool_tip import * 69from cengal.user_interface.gui.tkinter.components.aggregator_view import * 70from cengal.math.numbers import RationalNumber 71from cengal.code_flow_control.args_manager import args_kwargs_to_str, ArgsKwargs 72from greenlet import GreenletExit 73from enum import IntFlag, auto 74from uuid import uuid4 75 76 77command_executor_aggregator_view_key = 'command_executor_aggregator' 78command_executor_aggregator_append_view_key = 'command_executor_aggregator_append' 79 80coro_holder_header = 'async def coro_holder(i: Interface, fac: FastAggregatorClient, aggregator_key, aggregator_append_key):' 81coro_holder_template = f'''from cengal.parallel_execution.coroutines.coro_scheduler import * 82from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import * 83from cengal.parallel_execution.coroutines.coro_standard_services.put_coro_list import * 84from cengal.parallel_execution.coroutines.coro_standard_services.kill_coro import * 85from cengal.parallel_execution.coroutines.coro_standard_services.kill_coro_list import * 86from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import * 87from cengal.parallel_execution.coroutines.coro_standard_services.sleep import * 88from cengal.parallel_execution.coroutines.coro_standard_services.throw_coro import * 89from cengal.parallel_execution.coroutines.coro_standard_services.throw_coro_list import * 90from cengal.parallel_execution.coroutines.coro_standard_services.wait_coro import * 91from cengal.parallel_execution.coroutines.coro_standard_services.shutdown_loop import * 92from cengal.parallel_execution.coroutines.coro_standard_services.fast_aggregator import * 93from cengal.parallel_execution.coroutines.coro_standard_services.log import * 94from cengal.parallel_execution.coroutines.coro_tools.coro_flow_control import graceful_coro_destroyer, GracefulCoroDestroy 95import asyncio 96 97 98{coro_holder_header} 99{{coro_body}} 100 101 102result = i(RunCoro, coro_holder, FastAggregatorClient(), aggregator_key, aggregator_append_key) 103''' 104 105 106def prepare_coro_body(text: str) -> str: 107 if (not text) or ('\n' == text): 108 text = 'pass' 109 110 text = normalize_line_separators_and_tabs(text) 111 text_lines = text.splitlines() 112 return '\n'.join([' ' * 4 + line for line in text_lines]) 113 114 115class CommandExecutor(ttkb.Frame): 116 def __init__(self, width, height, allow_asyncio: bool = True, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None, *args, **kwargs) -> None: 117 self.width = width 118 self.height = height 119 self.allow_asyncio = allow_asyncio 120 self.priority = priority 121 self.interrupt_when_no_requests = interrupt_when_no_requests 122 self.current_coro_id: CoroID = None 123 if allow_asyncio: 124 i = current_interface() 125 i(AsyncioLoopRequest().ensure_loop(interrupt_when_no_requests=interrupt_when_no_requests, priority=priority)) 126 i(AsyncioLoopRequest().turn_on_loops_intercommunication()) 127 128 super().__init__(*args, **kwargs) 129 text_area_config = { 130 'highlightcolor': Style.instance.colors.primary, 131 'highlightbackground': Style.instance.colors.border, 132 'highlightthickness': 1, 133 'wrap': 'none', 134 } 135 if width is not None: 136 text_area_config.update({ 137 'width': (width - 20) >> 1, 138 }) 139 140 if height is not None: 141 text_area_config.update({ 142 'height': height, 143 }) 144 145 self.text_in_area = TtkbScrolledText(self, **text_area_config) 146 147 if 'Darwin' == OS_TYPE: 148 self.control_key_name = 'Cmd' 149 else: 150 self.control_key_name = 'Ctrl' 151 152 self.buttons_frame = ttkb.Frame(self) 153 self.run_button_text = '>>' 154 self.run_button_width = len(self.run_button_text) + 1 155 self.run_button = ttkb.Button(self.buttons_frame, text=self.run_button_text, width=self.run_button_width, command=self.run) 156 self.run_button_tooltip = ToolTipHovered(self.run_button, f'<{self.control_key_name}+Enter> Run and clear input field. Prevents human missclicks') 157 158 self.run_button_2_text = 'R>' 159 self.run_button_2_width = len(self.run_button_2_text) + 1 160 self.run_button_2 = ttkb.Button(self.buttons_frame, text=self.run_button_2_text, width=self.run_button_2_width, command=self.run_2) 161 self.run_button_2_tooltip = ToolTipHovered(self.run_button_2, f'<{self.control_key_name}+Alt+Enter> Run') 162 163 self.stop_button_text = 'X' 164 self.stop_button_width = len(self.stop_button_text) + 1 165 self.stop_button = ttkb.Button(self.buttons_frame, text=self.stop_button_text, width=self.stop_button_width, command=self.kill) 166 self.stop_button['state'] = 'disabled' 167 self.stop_button_tooltip = ToolTipHovered(self.stop_button, f'<{self.control_key_name}+x> Stop execution') 168 169 self.text_out_area = ReadOnlyText(self, **text_area_config) 170 171 172 self.func_header_read_only = ttkb.Label(self, text=coro_holder_header, font=('Helvetica', 8, 'bold')) 173 self.func_header_read_only.pack(fill='x', expand='yes', side='top') 174 175 self.text_in_area.pack(fill='both', side='left') 176 self.run_button.pack(fill='both', side='top', pady=1) 177 self.run_button_2.pack(fill='both', side='top', pady=1) 178 self.stop_button.pack(fill='both', side='top', pady=1) 179 self.buttons_frame.pack(fill='both', side='left') 180 self.text_out_area.pack(fill='both', expand='yes', side='left') 181 182 self.text_in_area._text.bind("<Control-Return>", self.ctrl_enter) 183 self.text_in_area._text.bind("<Control-Alt-Return>", self.ctrl_alt_enter) 184 self.text_in_area._text.bind("<Control-x>", self.ctrl_x) 185 186 def ctrl_enter(self, event): 187 self.run() 188 return 'break' 189 190 def ctrl_alt_enter(self, event): 191 self.run_2() 192 return 'break' 193 194 def ctrl_x(self, event): 195 self.kill() 196 return 'break' 197 198 def run(self, clear_input: bool = True): 199 text = self.text_in_area._text.get('1.0', 'end') 200 if clear_input: 201 self.text_in_area._text.delete('1.0', 'end') 202 203 self.text_out_area._text.insert('end', text) 204 205 # disable the intput 206 self.run_button['text'] = '...' 207 self.run_button_width = len(self.run_button['text']) + 1 208 self.run_button['state'] = 'disabled' 209 self.run_button_2['text'] = '...' 210 self.run_button_2_width = len(self.run_button_2['text']) + 1 211 self.run_button_2['state'] = 'disabled' 212 self.stop_button['state'] = 'normal' 213 self.text_in_area._text['state'] = 'disabled' 214 self.text_in_area.state(["disabled"]) 215 216 self.current_coro_id = current_interface()(PutCoro, self.eval_text, text) 217 218 def run_2(self): 219 return self.run(clear_input = False) 220 221 def kill(self): 222 if self.current_coro_id is not None: 223 graceful_coro_destroyer(current_interface(), None, self.current_coro_id, first_phase_is_wait=False) 224 self.current_coro_id = None 225 self.run_button['text'] = self.run_button_text 226 self.run_button_width = self.run_button_width 227 self.run_button['state'] = 'normal' 228 self.run_button_2['text'] = self.run_button_2_text 229 self.run_button_2_width = self.run_button_2_width 230 self.run_button_2['state'] = 'normal' 231 self.stop_button['state'] = 'disabled' 232 self.text_in_area._text['state'] = 'normal' 233 self.text_in_area.state(["!disabled"]) 234 235 def eval_text(self, i: Interface, text): 236 self.text_out_area._text.delete('1.0', 'end') 237 238 # evalueate the text 239 try: 240 result = None 241 local_vars = { 242 'i': i, 243 'result': result, 244 'aggregator_key': command_executor_aggregator_view_key, 245 'aggregator_append_key': command_executor_aggregator_append_view_key, 246 } 247 text = coro_holder_template.format(coro_body=prepare_coro_body(text)) 248 exec(text, local_vars) 249 self.text_out_area._text.insert('1.0', f'{local_vars["result"]}') 250 except GracefulCoroDestroy as ex: 251 # self.text_out_area._text.insert('1.0', f'{type(ex)}') 252 pass 253 except GreenletExit as ex: 254 # self.text_out_area._text.insert('1.0', f'{type(ex)}') 255 pass 256 except: 257 self.text_out_area._text.insert('1.0', f'{exception_to_printable_text(get_exception())}') 258 finally: 259 # enable the intput 260 self.run_button['text'] = self.run_button_text 261 self.run_button_width = len(self.run_button['text']) + 1 262 self.run_button['state'] = 'normal' 263 self.run_button_2['text'] = self.run_button_2_text 264 self.run_button_2_width = len(self.run_button_2['text']) + 1 265 self.run_button_2['state'] = 'normal' 266 self.stop_button['state'] = 'disabled' 267 self.text_in_area._text['state'] = 'normal' 268 self.text_in_area.state(["!disabled"]) 269 270 271class CSStatsFormatterMultiprocess: 272 def __init__(self, filtered_paths: List[Sequence[str]] = None): 273 self.filtered_paths: List[Sequence[str]] = [ 274 ['CoroSchedulerBase', 'loop', 'coroutines execution times'], 275 ['CoroSchedulerBase', 'loop', 'longest continuous execution time of coroutines'], 276 ['CoroSchedulerGreenlet', 'loop', 'coroutines execution times'], 277 ['CoroSchedulerGreenlet', 'loop', 'longest continuous execution time of coroutines'], 278 ['CoroSchedulerAwaitable', 'loop', 'coroutines execution times'], 279 ['CoroSchedulerAwaitable', 'loop', 'longest continuous execution time of coroutines'], 280 ] if filtered_paths is None else filtered_paths 281 self._stop: bool = False 282 self._subprocess_started: bool = False 283 settings = SubprocessWorkerSettings() 284 settings.initiation_function = self.process_initializer 285 settings.working_function = self.process_worker 286 settings.transport = Transport.queue 287 settings.sendable_data_type = SendableDataType.marshalable 288 settings.use_internal_subprocess_input_buffer = True 289 settings.initialization_data = { 290 'filtered_paths': self.filtered_paths 291 } 292 293 self.worker_lock: Lock = Lock(f'CSStatsFormatterMultiprocess.worker__{uuid4()}') 294 self.worker: SubprocessWorker = SubprocessWorker(settings) 295 self.worker.start(wait_for_process_readyness=False) 296 self.worker_is_ready: bool = False 297 298 async def _worker_readyness_waiting_coro(self, i: Interface): 299 need_to_wait = True 300 while need_to_wait: 301 try: 302 async with self.worker_lock: 303 self.worker.wait_for_subprocess_readines(block=False) 304 305 need_to_wait = False 306 except SubprocessIsNotReadyError: 307 await i(Sleep, 0.1) 308 309 self.worker_is_ready = True 310 311 async def wait_for_worker_readyness(self, i: Interface): 312 while not self.worker_is_ready: 313 await i(Sleep, 0.1) 314 315 def start(self, wr: Optional[TkObjWrapper] = None): 316 if wr is None: 317 i = current_interface() 318 i(PutCoro, self._worker_readyness_waiting_coro) 319 else: 320 wr.put_coro(self._worker_readyness_waiting_coro) 321 322 def stop(self): 323 self._stop = True 324 need_to_block = False 325 try: 326 i: Interface = current_interface() 327 except: 328 need_to_block = True 329 330 with self.worker_lock: 331 if need_to_block: 332 self.worker.stop() 333 else: 334 run_in_thread_pool_fast(i, self.worker.stop) 335 336 def update_filtered_paths(self, filtered_paths: List[Sequence[str]]): 337 i: Interface = current_interface() 338 self.filtered_paths = filtered_paths 339 with self.worker_lock: 340 if not self._stop: 341 self.worker.send_data_to_subprocess({ 342 'type': 'filter', 343 'data': self.filtered_paths 344 }) 345 346 def __call__(self, data): 347 with self.worker_lock: 348 if not self.worker_is_ready: 349 # return data 350 return '<<Waiting for subprocess to start...>>' 351 352 i: Interface = current_interface() 353 self.worker.send_data_to_subprocess({ 354 'type': 'data', 355 'data': data 356 }) 357 result = None 358 got_result = False 359 while (not got_result) and (not self._stop): 360 try: 361 result = self.worker.get_answer_from_subprocess(block=False) 362 except Empty: 363 pass 364 else: 365 got_result = True 366 367 if got_result: 368 if 'data' != result['type']: 369 got_result = False 370 371 if not got_result: 372 i(Sleep, 0.01) 373 374 result = '' if result is None else result['data'] 375 result = f'{i.coro_id}. CoroScheduler stats:\n{result}' 376 return result 377 378 @staticmethod 379 def process_initializer(init_data) -> Any: 380 from pprintpp import pformat as pf 381 return init_data 382 383 @staticmethod 384 def process_worker(global_data, data_msg): 385 data = data_msg['data'] 386 if 'filter' == data_msg['type']: 387 if global_data is None: 388 global_data = dict() 389 390 global_data['filtered_paths'] = data 391 return None # no answer to parent process 392 elif 'data' == data_msg['type']: 393 filtered_paths = global_data['filtered_paths'] 394 for path in filtered_paths: 395 try_del_dict_item(data, path) 396 397 data = pf(data, indent=4, width=120) 398 data_msg['data'] = data 399 return data_msg 400 401 402class CorosMaxExecutionTimesProvider: 403 def __init__(self, lifetime_stats_key: Hashable, stats_key: Hashable, period: float): 404 self.lifetime_stats_key: Hashable = lifetime_stats_key 405 self.stats_key: Hashable = stats_key 406 self.period: float = period 407 self.i: Interface = None 408 self.fac = FastAggregatorClient() 409 self._stop = False 410 self.lifetime_stats: Dict[CoroID, float] = dict() 411 412 def start(self): 413 if self.i is None: 414 self.i = current_interface() 415 416 self.i(PutCoro, self._update) 417 418 def stop(self): 419 self._stop = True 420 421 async def _update(self, i: Interface): 422 while not self._stop: 423 data = copy(i._loop.coro_longest_execution_time) 424 new_coro_longest_execution_time = dict() 425 for key, value in data.items(): 426 new_coro_longest_execution_time[key] = 0.0 427 428 if key not in self.lifetime_stats: 429 self.lifetime_stats[key] = 0.0 430 431 self.lifetime_stats[key] = max(self.lifetime_stats[key], value) 432 433 i._loop.coro_longest_execution_time = new_coro_longest_execution_time 434 self.fac(self.lifetime_stats_key, self.lifetime_stats) 435 self.fac(self.stats_key, data) 436 await i(Sleep, self.period) 437 438 def _formatter(self, title: str, data): 439 i: Interface = current_interface() 440 return f'{i.coro_id}. {title}:\n' + '\n'.join([f'{coro_id}: {value * 1000}' for coro_id, value in data.items()]) 441 442 def stats_formatter(self, data): 443 return self._formatter('Coros Max Execution Times', data) 444 445 def lifetime_stats_formatter(self, data): 446 return self._formatter('Coros Lifetime Max Execution Times', data) 447 448 449class CoroLogsAppendFormatterParts(IntFlag): 450 none = 0 451 prompt_string = auto() 452 time = auto() 453 caller_info = auto() 454 logging_level = auto() 455 log = auto() 456 coros_traceback = auto() 457 file_name_and_line_number = auto() 458 traceback = auto() 459 all = prompt_string | time | caller_info | logging_level | log | coros_traceback | file_name_and_line_number | traceback 460 461 462class CoroLogsAppendFormatter(AggregatorAppendFormatter): 463 def __init__(self, initial_text: str, desired_parts: CoroLogsAppendFormatterParts = CoroLogsAppendFormatterParts.all) -> None: 464 super().__init__(initial_text) 465 self._prompt_string: str = f'>>> {"-"*80}' 466 self.desired_parts: CoroLogsAppendFormatterParts = desired_parts 467 468 def __call__(self, data: List[Tuple[Tuple, Dict, Dict[str, Any]]]) -> Any: 469 if data: 470 data_part = '\n\n'.join([self._format(*log_info) for log_info in data]) 471 return super().__call__(f'\n{data_part}\n') 472 else: 473 return super().__call__(str()) 474 475 def _format(self, args, kwargs, info=None) -> str: 476 if info is None: 477 return f'{self._prompt_string}\n{args_kwargs_to_str(args, kwargs)}' 478 else: 479 output_strings: List[str] = list() 480 if CoroLogsAppendFormatterParts.prompt_string in self.desired_parts: 481 output_strings.append(self._prompt_string) 482 483 if CoroLogsAppendFormatterParts.time in self.desired_parts: 484 output_strings.append(f'> Time: {info[InfoFields.current_time]}; Perf Counter: {info[InfoFields.perf_counter_time]:17.6f}') 485 486 if CoroLogsAppendFormatterParts.caller_info in self.desired_parts: 487 output_strings.append(f'> λ: {info[InfoFields.caller_info]}') 488 489 if CoroLogsAppendFormatterParts.logging_level in self.desired_parts: 490 if InfoFields.logging_level in info: 491 output_strings.append(f'> Logging level: {info[InfoFields.logging_level]}') 492 493 if CoroLogsAppendFormatterParts.log in self.desired_parts: 494 output_strings.append(args_kwargs_to_str(args, kwargs)) 495 496 if CoroLogsAppendFormatterParts.coros_traceback in self.desired_parts: 497 coro_parents_strings: List[str] = info[InfoFields.coro_parents_strings] 498 if coro_parents_strings: 499 coro_parents_text: str = '\n'.join(coro_parents_strings) 500 output_strings.append(f'> Coros traceback:\n{coro_parents_text}') 501 502 if CoroLogsAppendFormatterParts.file_name_and_line_number in self.desired_parts: 503 output_strings.append(f'> @ {info[InfoFields.file_name]}:{info[InfoFields.line_number]}') 504 505 if CoroLogsAppendFormatterParts.traceback in self.desired_parts: 506 traceback_strings: List[str] = info[InfoFields.traceback_strings] 507 if traceback_strings: 508 traceback_text: str = '\n'.join(traceback_strings) 509 traceback_text = traceback_text.strip('\n') 510 output_strings.append(f'> Traceback:\n{traceback_text}') 511 512 return '\n'.join(output_strings) 513 514 515class CorosLogsProvider: 516 def __init__(self, coros_logs_key: Hashable, logs_limit: Union[None, int], period: float, force_stop_timeout: RationalNumber = 0.1): 517 self.coros_logs_key: Hashable = coros_logs_key 518 self.logs_limit: Union[None, int] = logs_limit 519 self.period: float = period 520 self.i: Interface = None 521 self._update_coro_id: CoroID = None 522 self.force_stop_timeout: RationalNumber = force_stop_timeout 523 self.fac = FastAggregatorClient() 524 self._stop = False 525 self.lifetime_stats: Dict[CoroID, float] = dict() 526 self._current_logs_taken: bool = False 527 self._unsend_data: List[List[Tuple[Tuple, Dict]]] = list() 528 self._log_service_handler_was_added: bool = False 529 530 def start(self): 531 if self.i is None: 532 self.i = current_interface() 533 534 self._update_coro_id = self.i(PutCoro, self._update) 535 536 def stop(self): 537 self._stop = True 538 current_interface()(PutCoro, self._force_stop) 539 540 async def _force_stop(self, i: Interface): 541 if self._update_coro_id is not None: 542 update_coro_id = self._update_coro_id 543 self._update_coro_id = None 544 await agraceful_coro_destroyer(i, self.force_stop_timeout, update_coro_id) 545 546 if self._log_service_handler_was_added: 547 self._log_service_handler_was_added = False 548 await i(LogRequest().remove_iteration_handler(self)) 549 550 self._unsend_data = type(self._unsend_data)() 551 552 async def _update(self, i: Interface): 553 while not self._stop: 554 if not self._log_service_handler_was_added: 555 if i._loop.is_service_registered(Log): 556 self._log_service_handler_was_added = True 557 await i(LogRequest().add_iteration_handler(self)) 558 559 logs_parts = self._unsend_data 560 self._unsend_data = type(self._unsend_data)() 561 for logs_part in logs_parts: 562 self.fac(self.coros_logs_key, logs_part) 563 564 await i(Sleep, self.period) 565 566 def __call__(self, log_service: Log, data: List[Tuple[Tuple, Dict, Dict[str, Any]]], current_time: float, current_time_str: str): 567 if self._current_logs_taken: 568 if self.logs_limit is None: 569 if data: 570 self._unsend_data.append(data) 571 else: 572 num_of_an_initial_logs_needed: int = self.logs_limit - len(data) 573 if 0 <= num_of_an_initial_logs_needed: 574 if data: 575 self._unsend_data.append(data) 576 else: 577 part_of_data = data[-self.logs_limit:] 578 if part_of_data: 579 self._unsend_data.append(part_of_data) 580 else: 581 self._current_logs_taken = True 582 if self.logs_limit is None: 583 combined_data = log_service.get_last_n_logs(None) + data 584 if combined_data: 585 self._unsend_data.append(combined_data) 586 else: 587 num_of_an_initial_logs_needed: int = self.logs_limit - len(data) 588 if 0 < num_of_an_initial_logs_needed: 589 combined_data = log_service.get_last_n_logs(num_of_an_initial_logs_needed) + data 590 if combined_data: 591 self._unsend_data.append(combined_data) 592 if 0 == num_of_an_initial_logs_needed: 593 if data: 594 self._unsend_data.append(data) 595 else: 596 part_of_data = data[-self.logs_limit:] 597 if part_of_data: 598 self._unsend_data.append(part_of_data) 599 600 601class SchedulerPerformanceFormatter: 602 def __init__(self, loop_iteration_delta_times_key: Hashable, loop_iteration_delta_times_lifetime_stats_key: Hashable, loop_iteration_delta_times_stats_key: Hashable, window_size: int): 603 self.external_items_key: Hashable = loop_iteration_delta_times_key 604 self.internal_items_key: Hashable = f'internal_scheduler_tdelta__{uuid4()}' 605 self.lifetime_stats_key: Hashable = loop_iteration_delta_times_lifetime_stats_key 606 self.stats_key: Hashable = loop_iteration_delta_times_stats_key 607 self.window_size: int = window_size 608 self.fac = FastAggregatorClient() 609 self.i: Interface = None 610 self.data_window: List = list() 611 self.max_deviation: float = None 612 self.min_deviation: float = None 613 self.max_iter_per_sec: float = None 614 self.min_iter_per_sec: float = None 615 self._stop: bool = False 616 617 settings = SubprocessWorkerSettings() 618 settings.initiation_function = self.process_initializer 619 settings.working_function = self.process_worker 620 settings.transport = Transport.queue 621 settings.sendable_data_type = SendableDataType.marshalable 622 self.worker_lock: Lock = Lock(f'SchedulerPerformanceFormatter.worker__{uuid4()}') 623 self.worker: SubprocessWorker = SubprocessWorker(settings) 624 self.worker.start(wait_for_process_readyness=False) 625 self.worker_is_ready: bool = False 626 627 async def _worker_readyness_waiting_coro(self, i: Interface): 628 need_to_wait = True 629 while need_to_wait: 630 try: 631 async with self.worker_lock: 632 self.worker.wait_for_subprocess_readines(block=False) 633 634 need_to_wait = False 635 except SubprocessIsNotReadyError: 636 await i(Sleep, 0.1) 637 638 self.worker_is_ready = True 639 640 async def wait_for_worker_readyness(self, i: Interface): 641 while not self.worker_is_ready: 642 await i(Sleep, 0.1) 643 644 def start(self, wr: Optional[TkObjWrapper] = None): 645 if self.i is None: 646 self.i = current_interface() 647 648 if wr is None: 649 self.i(PutCoro, self._worker_readyness_waiting_coro) 650 self.i(PutCoro, self._update) 651 self.i(PutCoro, self._update_stats) 652 else: 653 wr.put_coro(self._worker_readyness_waiting_coro) 654 wr.put_coro(self._update) 655 wr.put_coro(self._update_stats) 656 657 def stop(self): 658 self._stop = True 659 need_to_block = False 660 try: 661 i: Interface = current_interface() 662 except: 663 need_to_block = True 664 665 with self.worker_lock: 666 if need_to_block: 667 self.worker.stop() 668 else: 669 run_in_thread_pool_fast(i, self.worker.stop) 670 671 def _update(self, i: Interface): 672 i(RunCoro, self.wait_for_worker_readyness) 673 while not self._stop: 674 # start = perf_counter() 675 i(Yield) 676 # stop = perf_counter() 677 # delta_time = stop - start 678 loop: CoroSchedulerType = i._loop 679 delta_time = loop.loop_iteration_delta_time 680 self.fac(self.external_items_key, delta_time) 681 self.fac(self.internal_items_key, delta_time) 682 683 def _update_stats(self, i: Interface): 684 i(RunCoro, self.wait_for_worker_readyness) 685 while not self._stop: 686 try: 687 data = i(FastAggregator, self.internal_items_key) 688 if len(data) >= self.window_size: 689 self.data_window = data[-self.window_size:] 690 else: 691 additional_data_len = self.window_size - len(data) 692 if len(self.data_window) >= additional_data_len: 693 self.data_window = self.data_window[-additional_data_len:] + data 694 else: 695 self.data_window.extend(data) 696 697 iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation = self.compute_stats(i, self.data_window) 698 if self.max_deviation is None: 699 self.max_deviation = max_deviation 700 else: 701 self.max_deviation = max(self.max_deviation, max_deviation) 702 703 if self.min_deviation is None: 704 self.min_deviation = min_deviation 705 else: 706 self.min_deviation = min(self.min_deviation, min_deviation) 707 708 if self.max_iter_per_sec is None: 709 self.max_iter_per_sec = iter_per_sec 710 else: 711 self.max_iter_per_sec = max(self.max_iter_per_sec, iter_per_sec) 712 713 if self.min_iter_per_sec is None: 714 self.min_iter_per_sec = iter_per_sec 715 else: 716 self.min_iter_per_sec = min(self.min_iter_per_sec, iter_per_sec) 717 718 self.fac(self.stats_key, (iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation)) 719 self.fac(self.lifetime_stats_key, (self.max_iter_per_sec, self.min_iter_per_sec, self.max_deviation, self.min_deviation)) 720 721 self.window_size = 2 * int(round(iter_per_sec)) 722 except KeyError: 723 pass 724 725 i(Sleep, 0.2) 726 727 def compute_stats(self, i: Interface, data): 728 with self.worker_lock: 729 self.worker.send_data_to_subprocess(data) 730 result = None 731 got_result = False 732 while not got_result: 733 try: 734 result = self.worker.get_answer_from_subprocess(block=False) 735 except Empty: 736 pass 737 else: 738 got_result = True 739 740 if not got_result: 741 i(Sleep, 0.005) 742 743 return result 744 745 @staticmethod 746 def process_initializer(init_data): 747 pass 748 749 @staticmethod 750 def process_worker(global_data, data): 751 from cengal.statistics.normal_distribution import count_99_95_68 752 val_99, val_95, val_68, max_deviation, min_deviation = count_99_95_68(data) 753 average_data = average(data) 754 iter_per_sec = 0 if 0 == average_data else (1 / average_data) 755 return (iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation) 756 757 @staticmethod 758 def item_formatter(data) -> str: 759 return f'{data}' 760 761 @staticmethod 762 def stats_formatter(data) -> str: 763 i: Interface = current_interface() 764 iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation = data 765 return f'{i.coro_id}. Stats:\niter_per_sec: {iter_per_sec}\nval_99: {val_99}\nval_95: {val_95}\nval_68: {val_68}\nmax_deviation: {max_deviation}\nmin_deviation: {min_deviation}' 766 767 @staticmethod 768 def lifetime_stats_formatter(data) -> str: 769 i: Interface = current_interface() 770 max_iter_per_sec, min_iter_per_sec, max_deviation, min_deviation = data 771 return f'{i.coro_id}. Lifetime Stats:\nmax_iter_per_sec: {max_iter_per_sec}\nmin_iter_per_sec: {min_iter_per_sec}\nmax_deviation: {max_deviation}\nmin_deviation: {min_deviation}' 772 773 774class HelpDialog(simpledialog.Dialog): 775 def body(self, master): 776 self.title('Console Description and Help') 777 778 text_area_config = { 779 'highlightcolor': Style.instance.colors.primary, 780 'highlightbackground': Style.instance.colors.border, 781 'highlightthickness': 1, 782 'wrap': 'none', 783 'width': 120, 784 'height': 30, 785 } 786 787 self.text_area = ReadOnlyText(self, **text_area_config) 788 self.text_area.pack(fill='both', expand='yes') 789 self.text_area._text.delete('1.0', 'end') 790 self.text_area._text.insert('end', coro_holder_template) 791 792 return self.text_area # initial focus 793 794 795class FilterSetupDialog(ttkb.Toplevel): 796 def __init__(self, parent, filtered_paths): 797 # position is centered to parent 798 parent_pos_left, parent_pos_top = parent.winfo_x(), parent.winfo_y() 799 parent_size_x, parent_size_y = parent.winfo_width(), parent.winfo_height() 800 own_size_x, own_size_y = 600, 400 801 own_pos_x = parent_pos_left + (parent_size_x - own_size_x) // 2 802 own_pos_y = parent_pos_top + (parent_size_y - own_size_y) // 2 803 super().__init__(parent, size=(own_size_x, own_size_y), resizable=(True, True), position=(own_pos_x, own_pos_y)) 804 self.filtered_paths = filtered_paths 805 self.title('Filtered paths setup') 806 self.transient(parent) 807 self.result = None 808 809 ttkb.Label(self, text="Enter your text:").pack(fill=None, expand='no', side='top') 810 self.text_entry = TtkbScrolledText(self, width=30, height=10) 811 self.text_entry.pack(fill='both', expand='yes', side='top') 812 813 button_frame = ttkb.Frame(self) 814 button_frame.pack(fill='x', expand='no', side='right') 815 816 ttkb.Button(button_frame, text='OK', command=self.ok).pack(side=ttkb.LEFT, padx=1) 817 ttkb.Button(button_frame, text='Cancel', command=self.cancel).pack(side=ttkb.LEFT, padx=1) 818 819 self.protocol("WM_DELETE_WINDOW", self.cancel) 820 self.text_entry.focus_set() 821 filtered_paths_list = list() 822 for path in self.filtered_paths: 823 filtered_paths_list.append(dict_path_to_str(path)) 824 825 filtered_paths_text = '\n'.join(filtered_paths_list) + '\n' 826 self.text_entry._text.insert('1.0', filtered_paths_text) 827 828 self.text_entry._text.bind("<Control-Return>", self.ctrl_enter) 829 self.text_entry._text.bind("<Escape>", self.escape) 830 self.text_entry._text.focus_set() 831 832 def ctrl_enter(self, event): 833 self.ok(event) 834 return 'break' 835 836 def escape(self, event): 837 self.cancel(event) 838 return 'break' 839 840 def ok(self, event=None): 841 filtered_paths_text = self.text_entry.get('1.0', 'end-1c') 842 result_lines = filtered_paths_text.split('\n') 843 filtered_paths = list() 844 for line in result_lines: 845 if line: 846 filtered_paths.append(srt_to_dict_path(line)) 847 848 self.result = filtered_paths 849 self.destroy() 850 851 def cancel(self, event=None): 852 self.result = self.filtered_paths 853 self.destroy() 854 855 856# class Application(Tk): 857class Application(ttkb.Window): 858 def __init__(self, style: str = 'superhero', filtered_paths: List[List[str]] = None, current_children_pack_type: int = 1): 859 super().__init__(size=(1900, 900), resizable=(True, True)) 860 self.style_name = style 861 Style(style) 862 self.current_children_pack_type: int = current_children_pack_type 863 self.max_children_pack_type: int = 2 864 self.packed_widgets: List[tk.Widget] = list() 865 self.filtered_paths: List[Sequence[str]] = filtered_paths 866 867 self.title('CoroScheduler Admin') 868 self.resizable(width=True, height=True) 869 870 self.sfmp = CSStatsFormatterMultiprocess(self.filtered_paths) 871 872 self.scheduler_stats_frame = ttkb.Frame(self) 873 self.scheduler_stats_control_frame = ttkb.Frame(self.scheduler_stats_frame) 874 875 self.scheduler_stats_layout_button_text = self.scheduler_stats_layout_button_name() 876 self.scheduler_stats_layout_button_text_len = len('L') + 1 877 self.scheduler_stats_layout_button = ttkb.Button(self.scheduler_stats_control_frame, text=self.scheduler_stats_layout_button_text, width=self.scheduler_stats_layout_button_text_len, command=self.scheduler_stats_layout_button_on_click) 878 self.scheduler_stats_layout_button_tooltip = ToolTipHovered(self.scheduler_stats_layout_button, 'Layout circle switch') 879 880 self.scheduler_stats_format_button_text = 'F' 881 self.scheduler_stats_format_button_text_len = len(self.scheduler_stats_format_button_text) + 1 882 self.scheduler_stats_format_button = ttkb.Button(self.scheduler_stats_control_frame, text=self.scheduler_stats_format_button_text, width=self.scheduler_stats_format_button_text_len, command=self.scheduler_stats_format_button_on_click) 883 self.scheduler_stats_format_button_tooltip = ToolTipHovered(self.scheduler_stats_format_button, 'Filter our unnecessary or flickering paths') 884 885 self.scheduler_stats_help_button_text = '?' 886 self.scheduler_stats_help_button_text_len = len(self.scheduler_stats_help_button_text) + 1 887 self.scheduler_stats_help_button = ttkb.Button(self.scheduler_stats_control_frame, text=self.scheduler_stats_help_button_text, width=self.scheduler_stats_help_button_text_len, bootstyle="info", command=self.scheduler_stats_help_button_on_click) 888 self.scheduler_stats_help_button_tooltip = ToolTipHovered(self.scheduler_stats_help_button, 'Console description and help') 889 890 self.coro_scheduler_view = AggregatorView(False, False, 'coro_scheduler_stats', 1, self.sfmp, 25, 23, self.scheduler_stats_frame) 891 892 self.coro_logs_formatter: CoroLogsAppendFormatter = CoroLogsAppendFormatter('Coro Logs') 893 self.coro_logs_provider: CorosLogsProvider = CorosLogsProvider('coro scheduler logs', None, 0.25) 894 self.coro_scheduler_logs_view = AggregatorView(True, True, 'coro scheduler logs', 1, self.coro_logs_formatter, 25, 23, self.scheduler_stats_frame) 895 896 self.command_executor_view = CommandExecutor(160, 6) 897 898 self.cmet = CorosMaxExecutionTimesProvider('coros lifetime max execution times', 'coros max execution times', 0.5) 899 900 self.coros_lifetime_max_execution_times = AggregatorView(False, False, 'coros lifetime max execution times', 0.75, self.cmet.lifetime_stats_formatter, 25, 13, self) 901 902 self.coros_max_execution_times = AggregatorView(False, False, 'coros max execution times', 0.75, self.cmet.stats_formatter, 25, 13, self) 903 904 self.spf = SchedulerPerformanceFormatter('loop_iteration_delta_times', 'loop_iteration_delta_times_lifetime_stats', 'loop_iteration_delta_times_stats', 5000) 905 906 self.scheduler_tdelta_formatter = AggregatorAppendFormatter('Scheduler TDelta') 907 self.scheduler_tdelta = AggregatorView(True, False, 'loop_iteration_delta_times', 0.1, self.scheduler_tdelta_formatter, 22, 13, self) 908 self.scheduler_tdelta.default_auto_scroll = False 909 self.scheduler_tdelta.max_len = 1000 910 911 def aggregator_view_formatter(data: Any) -> str: 912 return f'{current_interface().coro_id}. Aggregator:\n{data}' 913 914 self.command_executor_aggregator_view = AggregatorView(False, False, command_executor_aggregator_view_key, 1, aggregator_view_formatter, 25, 13, self) 915 FastAggregatorClient()(command_executor_aggregator_view_key, str()) 916 917 self.command_executor_aggregator_append_formatter = AggregatorAppendFormatter('Aggregator Append') 918 self.command_executor_aggregator_append_view = AggregatorView(True, True, command_executor_aggregator_append_view_key, 1, self.command_executor_aggregator_append_formatter, 25, 13, self) 919 self.command_executor_aggregator_append_view.max_len = 5000 920 FastAggregatorClient()(command_executor_aggregator_append_view_key, str()) 921 922 self.scheduler_lifetime_stats = AggregatorView(False, False, 'loop_iteration_delta_times_lifetime_stats', 0.3, self.spf.lifetime_stats_formatter, 36, 5, self) 923 924 self.scheduler_stats = AggregatorView(False, False, 'loop_iteration_delta_times_stats', 0.3, self.spf.stats_formatter, 36, 7, self) 925 926 self.pack_children(self.current_children_pack_type) 927 928 # set window size taking into account the size of the widgets 929 self.update() 930 width = self.winfo_width() 931 height = self.winfo_height() 932 x = (self.winfo_screenwidth() // 2) - (width // 2) 933 y = (self.winfo_screenheight() // 2) - (height // 2) 934 self.geometry('{}x{}+{}+{}'.format(width, height, x, y)) 935 self.update() 936 937 def pack_forget_children(self): 938 self.packed_widgets.reverse() 939 for widget in self.packed_widgets: 940 widget.pack_forget() 941 942 def pack_children(self, pack_type: int): 943 self.pack_forget_children() 944 self.packed_widgets.clear() 945 if 0 == pack_type: 946 self.scheduler_stats_layout_button.pack(fill=None, expand='no', side='top', pady=1) 947 self.packed_widgets.append(self.scheduler_stats_layout_button) 948 self.scheduler_stats_format_button.pack(fill=None, expand='no', side='top', pady=1) 949 self.packed_widgets.append(self.scheduler_stats_format_button) 950 self.scheduler_stats_help_button.pack(fill=None, expand='no', side='top', pady=1) 951 self.packed_widgets.append(self.scheduler_stats_help_button) 952 self.scheduler_stats_control_frame.pack(fill='y', expand='no', side='left') 953 self.packed_widgets.append(self.scheduler_stats_control_frame) 954 self.coro_scheduler_view.pack(fill='both', expand='yes', side='left') 955 self.packed_widgets.append(self.coro_scheduler_view) 956 self.coro_scheduler_logs_view.pack(fill='both', expand='no', side='left') 957 self.packed_widgets.append(self.coro_scheduler_logs_view) 958 959 self.scheduler_stats_frame.pack(fill='both', expand='yes', side='bottom') 960 self.packed_widgets.append(self.scheduler_stats_frame) 961 self.command_executor_view.pack(fill='x', expand='no', side='bottom') 962 self.packed_widgets.append(self.command_executor_view) 963 self.coros_lifetime_max_execution_times.pack(fill='x', expand='no', side='left') 964 self.packed_widgets.append(self.coros_lifetime_max_execution_times) 965 self.coros_max_execution_times.pack(fill='x', expand='no', side='left') 966 self.packed_widgets.append(self.coros_max_execution_times) 967 self.scheduler_tdelta.pack(fill='x', expand='no', side='left') 968 self.packed_widgets.append(self.scheduler_tdelta) 969 self.command_executor_aggregator_view.pack(fill='x', expand='yes', side='right') 970 self.packed_widgets.append(self.command_executor_aggregator_view) 971 self.command_executor_aggregator_append_view.pack(fill='x', expand='yes', side='right') 972 self.packed_widgets.append(self.command_executor_aggregator_append_view) 973 self.scheduler_lifetime_stats.pack(fill=None, expand='no', side='top') 974 self.packed_widgets.append(self.scheduler_lifetime_stats) 975 self.scheduler_stats.pack(fill=None, expand='no', side='bottom') 976 self.packed_widgets.append(self.scheduler_stats) 977 elif 1 == pack_type: 978 self.scheduler_stats_layout_button.pack(fill=None, expand='no', side='top', pady=1) 979 self.packed_widgets.append(self.scheduler_stats_layout_button) 980 self.scheduler_stats_format_button.pack(fill=None, expand='no', side='top', pady=1) 981 self.packed_widgets.append(self.scheduler_stats_format_button) 982 self.scheduler_stats_help_button.pack(fill=None, expand='no', side='top', pady=1) 983 self.packed_widgets.append(self.scheduler_stats_help_button) 984 self.scheduler_stats_control_frame.pack(fill='y', expand='no', side='left') 985 self.packed_widgets.append(self.scheduler_stats_control_frame) 986 self.coro_scheduler_view.pack(fill='both', expand='yes', side='left') 987 self.packed_widgets.append(self.coro_scheduler_view) 988 self.coro_scheduler_logs_view.pack(fill='both', expand='yes', side='left') 989 self.packed_widgets.append(self.coro_scheduler_logs_view) 990 991 self.scheduler_stats_frame.pack(fill='both', expand='yes', side='bottom') 992 self.packed_widgets.append(self.scheduler_stats_frame) 993 self.command_executor_view.pack(fill='x', expand='no', side='bottom') 994 self.packed_widgets.append(self.command_executor_view) 995 self.coros_lifetime_max_execution_times.pack(fill='x', expand='no', side='left') 996 self.packed_widgets.append(self.coros_lifetime_max_execution_times) 997 self.coros_max_execution_times.pack(fill='x', expand='no', side='left') 998 self.packed_widgets.append(self.coros_max_execution_times) 999 self.scheduler_tdelta.pack(fill='x', expand='no', side='left') 1000 self.packed_widgets.append(self.scheduler_tdelta) 1001 self.command_executor_aggregator_view.pack(fill='x', expand='yes', side='right') 1002 self.packed_widgets.append(self.command_executor_aggregator_view) 1003 self.command_executor_aggregator_append_view.pack(fill='x', expand='yes', side='right') 1004 self.packed_widgets.append(self.command_executor_aggregator_append_view) 1005 self.scheduler_lifetime_stats.pack(fill=None, expand='no', side='top') 1006 self.packed_widgets.append(self.scheduler_lifetime_stats) 1007 self.scheduler_stats.pack(fill=None, expand='no', side='bottom') 1008 self.packed_widgets.append(self.scheduler_stats) 1009 elif 2 == pack_type: 1010 self.scheduler_stats_layout_button.pack(fill=None, expand='no', side='top', pady=1) 1011 self.packed_widgets.append(self.scheduler_stats_layout_button) 1012 self.scheduler_stats_format_button.pack(fill=None, expand='no', side='top', pady=1) 1013 self.packed_widgets.append(self.scheduler_stats_format_button) 1014 self.scheduler_stats_help_button.pack(fill=None, expand='no', side='top', pady=1) 1015 self.packed_widgets.append(self.scheduler_stats_help_button) 1016 self.scheduler_stats_control_frame.pack(fill='y', expand='no', side='left') 1017 self.packed_widgets.append(self.scheduler_stats_control_frame) 1018 self.coro_scheduler_view.pack(fill='both', expand='no', side='left') 1019 self.packed_widgets.append(self.coro_scheduler_view) 1020 self.coro_scheduler_logs_view.pack(fill='both', expand='yes', side='left') 1021 self.packed_widgets.append(self.coro_scheduler_logs_view) 1022 1023 self.scheduler_stats_frame.pack(fill='both', expand='yes', side='bottom') 1024 self.packed_widgets.append(self.scheduler_stats_frame) 1025 self.command_executor_view.pack(fill='x', expand='no', side='bottom') 1026 self.packed_widgets.append(self.command_executor_view) 1027 self.coros_lifetime_max_execution_times.pack(fill='x', expand='no', side='left') 1028 self.packed_widgets.append(self.coros_lifetime_max_execution_times) 1029 self.coros_max_execution_times.pack(fill='x', expand='no', side='left') 1030 self.packed_widgets.append(self.coros_max_execution_times) 1031 self.scheduler_tdelta.pack(fill='x', expand='no', side='left') 1032 self.packed_widgets.append(self.scheduler_tdelta) 1033 self.command_executor_aggregator_view.pack(fill='x', expand='yes', side='right') 1034 self.packed_widgets.append(self.command_executor_aggregator_view) 1035 self.command_executor_aggregator_append_view.pack(fill='x', expand='yes', side='right') 1036 self.packed_widgets.append(self.command_executor_aggregator_append_view) 1037 self.scheduler_lifetime_stats.pack(fill=None, expand='no', side='top') 1038 self.packed_widgets.append(self.scheduler_lifetime_stats) 1039 self.scheduler_stats.pack(fill=None, expand='no', side='bottom') 1040 self.packed_widgets.append(self.scheduler_stats) 1041 else: 1042 raise NotImplementedError 1043 1044 def scheduler_stats_layout_button_name(self) -> str: 1045 return 'L' if 0 == self.current_children_pack_type else 'L' + str(self.current_children_pack_type) 1046 1047 def scheduler_stats_layout_button_on_click(self): 1048 self.current_children_pack_type += 1 1049 if self.current_children_pack_type > self.max_children_pack_type: 1050 self.current_children_pack_type = 0 1051 1052 self.scheduler_stats_layout_button_text = self.scheduler_stats_layout_button_name() 1053 self.scheduler_stats_layout_button.config(text=self.scheduler_stats_layout_button_text) 1054 self.pack_children(self.current_children_pack_type) 1055 1056 def scheduler_stats_format_button_on_click(self): 1057 d = FilterSetupDialog(self, self.sfmp.filtered_paths) 1058 self.wait_window(d) 1059 self.sfmp.update_filtered_paths(d.result) 1060 1061 def scheduler_stats_help_button_on_click(self): 1062 HelpDialog(self) 1063 1064 def start(self, wr: TkObjWrapper): 1065 self.spf.start() 1066 self.sfmp.start(wr) 1067 self.coro_logs_provider.start() 1068 self.scheduler_tdelta.start(wr) 1069 self.command_executor_aggregator_view.start(wr) 1070 self.command_executor_aggregator_append_view.start(wr) 1071 self.scheduler_lifetime_stats.start(wr) 1072 self.scheduler_stats.start(wr) 1073 self.coro_scheduler_view.start(wr) 1074 self.coro_scheduler_logs_view.start(wr) 1075 self.cmet.start() 1076 self.coros_lifetime_max_execution_times.start(wr) 1077 self.coros_max_execution_times.start(wr) 1078 1079 def stop(self): 1080 self.spf.stop() 1081 self.sfmp.stop() 1082 self.coro_logs_provider.stop() 1083 self.scheduler_tdelta.stop() 1084 self.command_executor_aggregator_view.stop() 1085 self.command_executor_aggregator_append_view.stop() 1086 self.scheduler_lifetime_stats.stop() 1087 self.scheduler_stats.stop() 1088 self.coro_scheduler_view.stop() 1089 self.coro_scheduler_logs_view.stop() 1090 self.cmet.stop() 1091 self.coros_lifetime_max_execution_times.stop() 1092 self.coros_max_execution_times.stop() 1093 1094 def prepare(self, i: Interface, wr: TkObjWrapper): 1095 i(Yield) 1096 self.start(wr) 1097 i(Yield) 1098 1099 1100def coro_scheduler_admin__view(i: Interface, on_close: Optional[AnyWorker] = None, app_args_kwargs: Optional[ArgsKwargs] = None): 1101 app_args, app_kwargs = app_args_kwargs() if app_args_kwargs is not None else ArgsKwargs()() 1102 with TkinterContextManager(i, Application(*app_args, **app_kwargs)) as wr: 1103 app: Application = cast(Application, wr.tk) 1104 app.prepare(i, wr) 1105 i(InstanceRequest().set('admin_tk_app', app)) 1106 1107 if on_close is not None: 1108 app.stop() 1109 i(RunCoro, on_close) 1110 1111 1112def scheduler_stats_aggregator_provider(i: Interface, fac_key: str = 'coro_scheduler_stats'): 1113 cs: CoroSchedulerType = i._loop 1114 fac = FastAggregatorClient() 1115 while True: 1116 stats_level: EntityStatsMixin.StatsLevel = EntityStatsMixin.StatsLevel.info 1117 result = dict() 1118 name, stats = cs.get_entity_stats(stats_level) 1119 result[name] = stats 1120 fac(fac_key, result) 1121 i(Sleep, 0.5) 1122 1123 1124def cs_init(cs: CoroSchedulerType): 1125 cs.set_coro_time_measurement(True) 1126 cs.set_coro_history_gathering(True) 1127 cs.set_loop_iteration_time_measurement(True) 1128 1129 1130async def start_admin(i: Interface, on_close: Optional[AnyWorker] = None, app_args_kwargs: Optional[ArgsKwargs] = None): 1131 cs_init(i._loop) 1132 await i(PutCoroRequest().turn_on_tree_monitoring(True)) 1133 await i(PutCoro, scheduler_stats_aggregator_provider) 1134 await i(PutCoro, coro_scheduler_admin__view, on_close, app_args_kwargs)
async def
start_admin( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, on_close: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]], NoneType] = None, app_args_kwargs: typing.Union[cengal.code_flow_control.args_manager.versions.v_0.args_manager.ArgsKwargs, NoneType] = None):
1131async def start_admin(i: Interface, on_close: Optional[AnyWorker] = None, app_args_kwargs: Optional[ArgsKwargs] = None): 1132 cs_init(i._loop) 1133 await i(PutCoroRequest().turn_on_tree_monitoring(True)) 1134 await i(PutCoro, scheduler_stats_aggregator_provider) 1135 await i(PutCoro, coro_scheduler_admin__view, on_close, app_args_kwargs)
def
cs_init( cs: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable]):
class
SchedulerPerformanceFormatter:
602class SchedulerPerformanceFormatter: 603 def __init__(self, loop_iteration_delta_times_key: Hashable, loop_iteration_delta_times_lifetime_stats_key: Hashable, loop_iteration_delta_times_stats_key: Hashable, window_size: int): 604 self.external_items_key: Hashable = loop_iteration_delta_times_key 605 self.internal_items_key: Hashable = f'internal_scheduler_tdelta__{uuid4()}' 606 self.lifetime_stats_key: Hashable = loop_iteration_delta_times_lifetime_stats_key 607 self.stats_key: Hashable = loop_iteration_delta_times_stats_key 608 self.window_size: int = window_size 609 self.fac = FastAggregatorClient() 610 self.i: Interface = None 611 self.data_window: List = list() 612 self.max_deviation: float = None 613 self.min_deviation: float = None 614 self.max_iter_per_sec: float = None 615 self.min_iter_per_sec: float = None 616 self._stop: bool = False 617 618 settings = SubprocessWorkerSettings() 619 settings.initiation_function = self.process_initializer 620 settings.working_function = self.process_worker 621 settings.transport = Transport.queue 622 settings.sendable_data_type = SendableDataType.marshalable 623 self.worker_lock: Lock = Lock(f'SchedulerPerformanceFormatter.worker__{uuid4()}') 624 self.worker: SubprocessWorker = SubprocessWorker(settings) 625 self.worker.start(wait_for_process_readyness=False) 626 self.worker_is_ready: bool = False 627 628 async def _worker_readyness_waiting_coro(self, i: Interface): 629 need_to_wait = True 630 while need_to_wait: 631 try: 632 async with self.worker_lock: 633 self.worker.wait_for_subprocess_readines(block=False) 634 635 need_to_wait = False 636 except SubprocessIsNotReadyError: 637 await i(Sleep, 0.1) 638 639 self.worker_is_ready = True 640 641 async def wait_for_worker_readyness(self, i: Interface): 642 while not self.worker_is_ready: 643 await i(Sleep, 0.1) 644 645 def start(self, wr: Optional[TkObjWrapper] = None): 646 if self.i is None: 647 self.i = current_interface() 648 649 if wr is None: 650 self.i(PutCoro, self._worker_readyness_waiting_coro) 651 self.i(PutCoro, self._update) 652 self.i(PutCoro, self._update_stats) 653 else: 654 wr.put_coro(self._worker_readyness_waiting_coro) 655 wr.put_coro(self._update) 656 wr.put_coro(self._update_stats) 657 658 def stop(self): 659 self._stop = True 660 need_to_block = False 661 try: 662 i: Interface = current_interface() 663 except: 664 need_to_block = True 665 666 with self.worker_lock: 667 if need_to_block: 668 self.worker.stop() 669 else: 670 run_in_thread_pool_fast(i, self.worker.stop) 671 672 def _update(self, i: Interface): 673 i(RunCoro, self.wait_for_worker_readyness) 674 while not self._stop: 675 # start = perf_counter() 676 i(Yield) 677 # stop = perf_counter() 678 # delta_time = stop - start 679 loop: CoroSchedulerType = i._loop 680 delta_time = loop.loop_iteration_delta_time 681 self.fac(self.external_items_key, delta_time) 682 self.fac(self.internal_items_key, delta_time) 683 684 def _update_stats(self, i: Interface): 685 i(RunCoro, self.wait_for_worker_readyness) 686 while not self._stop: 687 try: 688 data = i(FastAggregator, self.internal_items_key) 689 if len(data) >= self.window_size: 690 self.data_window = data[-self.window_size:] 691 else: 692 additional_data_len = self.window_size - len(data) 693 if len(self.data_window) >= additional_data_len: 694 self.data_window = self.data_window[-additional_data_len:] + data 695 else: 696 self.data_window.extend(data) 697 698 iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation = self.compute_stats(i, self.data_window) 699 if self.max_deviation is None: 700 self.max_deviation = max_deviation 701 else: 702 self.max_deviation = max(self.max_deviation, max_deviation) 703 704 if self.min_deviation is None: 705 self.min_deviation = min_deviation 706 else: 707 self.min_deviation = min(self.min_deviation, min_deviation) 708 709 if self.max_iter_per_sec is None: 710 self.max_iter_per_sec = iter_per_sec 711 else: 712 self.max_iter_per_sec = max(self.max_iter_per_sec, iter_per_sec) 713 714 if self.min_iter_per_sec is None: 715 self.min_iter_per_sec = iter_per_sec 716 else: 717 self.min_iter_per_sec = min(self.min_iter_per_sec, iter_per_sec) 718 719 self.fac(self.stats_key, (iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation)) 720 self.fac(self.lifetime_stats_key, (self.max_iter_per_sec, self.min_iter_per_sec, self.max_deviation, self.min_deviation)) 721 722 self.window_size = 2 * int(round(iter_per_sec)) 723 except KeyError: 724 pass 725 726 i(Sleep, 0.2) 727 728 def compute_stats(self, i: Interface, data): 729 with self.worker_lock: 730 self.worker.send_data_to_subprocess(data) 731 result = None 732 got_result = False 733 while not got_result: 734 try: 735 result = self.worker.get_answer_from_subprocess(block=False) 736 except Empty: 737 pass 738 else: 739 got_result = True 740 741 if not got_result: 742 i(Sleep, 0.005) 743 744 return result 745 746 @staticmethod 747 def process_initializer(init_data): 748 pass 749 750 @staticmethod 751 def process_worker(global_data, data): 752 from cengal.statistics.normal_distribution import count_99_95_68 753 val_99, val_95, val_68, max_deviation, min_deviation = count_99_95_68(data) 754 average_data = average(data) 755 iter_per_sec = 0 if 0 == average_data else (1 / average_data) 756 return (iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation) 757 758 @staticmethod 759 def item_formatter(data) -> str: 760 return f'{data}' 761 762 @staticmethod 763 def stats_formatter(data) -> str: 764 i: Interface = current_interface() 765 iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation = data 766 return f'{i.coro_id}. Stats:\niter_per_sec: {iter_per_sec}\nval_99: {val_99}\nval_95: {val_95}\nval_68: {val_68}\nmax_deviation: {max_deviation}\nmin_deviation: {min_deviation}' 767 768 @staticmethod 769 def lifetime_stats_formatter(data) -> str: 770 i: Interface = current_interface() 771 max_iter_per_sec, min_iter_per_sec, max_deviation, min_deviation = data 772 return f'{i.coro_id}. Lifetime Stats:\nmax_iter_per_sec: {max_iter_per_sec}\nmin_iter_per_sec: {min_iter_per_sec}\nmax_deviation: {max_deviation}\nmin_deviation: {min_deviation}'
SchedulerPerformanceFormatter( loop_iteration_delta_times_key: typing.Hashable, loop_iteration_delta_times_lifetime_stats_key: typing.Hashable, loop_iteration_delta_times_stats_key: typing.Hashable, window_size: int)
603 def __init__(self, loop_iteration_delta_times_key: Hashable, loop_iteration_delta_times_lifetime_stats_key: Hashable, loop_iteration_delta_times_stats_key: Hashable, window_size: int): 604 self.external_items_key: Hashable = loop_iteration_delta_times_key 605 self.internal_items_key: Hashable = f'internal_scheduler_tdelta__{uuid4()}' 606 self.lifetime_stats_key: Hashable = loop_iteration_delta_times_lifetime_stats_key 607 self.stats_key: Hashable = loop_iteration_delta_times_stats_key 608 self.window_size: int = window_size 609 self.fac = FastAggregatorClient() 610 self.i: Interface = None 611 self.data_window: List = list() 612 self.max_deviation: float = None 613 self.min_deviation: float = None 614 self.max_iter_per_sec: float = None 615 self.min_iter_per_sec: float = None 616 self._stop: bool = False 617 618 settings = SubprocessWorkerSettings() 619 settings.initiation_function = self.process_initializer 620 settings.working_function = self.process_worker 621 settings.transport = Transport.queue 622 settings.sendable_data_type = SendableDataType.marshalable 623 self.worker_lock: Lock = Lock(f'SchedulerPerformanceFormatter.worker__{uuid4()}') 624 self.worker: SubprocessWorker = SubprocessWorker(settings) 625 self.worker.start(wait_for_process_readyness=False) 626 self.worker_is_ready: bool = False
async def
wait_for_worker_readyness( self, i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface):
def
start( self, wr: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.tkinter.versions.v_0.tkinter.TkObjWrapper, NoneType] = None):
645 def start(self, wr: Optional[TkObjWrapper] = None): 646 if self.i is None: 647 self.i = current_interface() 648 649 if wr is None: 650 self.i(PutCoro, self._worker_readyness_waiting_coro) 651 self.i(PutCoro, self._update) 652 self.i(PutCoro, self._update_stats) 653 else: 654 wr.put_coro(self._worker_readyness_waiting_coro) 655 wr.put_coro(self._update) 656 wr.put_coro(self._update_stats)
def
compute_stats( self, i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, data):
728 def compute_stats(self, i: Interface, data): 729 with self.worker_lock: 730 self.worker.send_data_to_subprocess(data) 731 result = None 732 got_result = False 733 while not got_result: 734 try: 735 result = self.worker.get_answer_from_subprocess(block=False) 736 except Empty: 737 pass 738 else: 739 got_result = True 740 741 if not got_result: 742 i(Sleep, 0.005) 743 744 return result
@staticmethod
def
process_worker(global_data, data):
750 @staticmethod 751 def process_worker(global_data, data): 752 from cengal.statistics.normal_distribution import count_99_95_68 753 val_99, val_95, val_68, max_deviation, min_deviation = count_99_95_68(data) 754 average_data = average(data) 755 iter_per_sec = 0 if 0 == average_data else (1 / average_data) 756 return (iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation)
@staticmethod
def
stats_formatter(data) -> str:
762 @staticmethod 763 def stats_formatter(data) -> str: 764 i: Interface = current_interface() 765 iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation = data 766 return f'{i.coro_id}. Stats:\niter_per_sec: {iter_per_sec}\nval_99: {val_99}\nval_95: {val_95}\nval_68: {val_68}\nmax_deviation: {max_deviation}\nmin_deviation: {min_deviation}'
@staticmethod
def
lifetime_stats_formatter(data) -> str:
768 @staticmethod 769 def lifetime_stats_formatter(data) -> str: 770 i: Interface = current_interface() 771 max_iter_per_sec, min_iter_per_sec, max_deviation, min_deviation = data 772 return f'{i.coro_id}. Lifetime Stats:\nmax_iter_per_sec: {max_iter_per_sec}\nmin_iter_per_sec: {min_iter_per_sec}\nmax_deviation: {max_deviation}\nmin_deviation: {min_deviation}'
def
scheduler_stats_aggregator_provider( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, fac_key: str = 'coro_scheduler_stats'):
1113def scheduler_stats_aggregator_provider(i: Interface, fac_key: str = 'coro_scheduler_stats'): 1114 cs: CoroSchedulerType = i._loop 1115 fac = FastAggregatorClient() 1116 while True: 1117 stats_level: EntityStatsMixin.StatsLevel = EntityStatsMixin.StatsLevel.info 1118 result = dict() 1119 name, stats = cs.get_entity_stats(stats_level) 1120 result[name] = stats 1121 fac(fac_key, result) 1122 i(Sleep, 0.5)
class
CSStatsFormatterMultiprocess:
272class CSStatsFormatterMultiprocess: 273 def __init__(self, filtered_paths: List[Sequence[str]] = None): 274 self.filtered_paths: List[Sequence[str]] = [ 275 ['CoroSchedulerBase', 'loop', 'coroutines execution times'], 276 ['CoroSchedulerBase', 'loop', 'longest continuous execution time of coroutines'], 277 ['CoroSchedulerGreenlet', 'loop', 'coroutines execution times'], 278 ['CoroSchedulerGreenlet', 'loop', 'longest continuous execution time of coroutines'], 279 ['CoroSchedulerAwaitable', 'loop', 'coroutines execution times'], 280 ['CoroSchedulerAwaitable', 'loop', 'longest continuous execution time of coroutines'], 281 ] if filtered_paths is None else filtered_paths 282 self._stop: bool = False 283 self._subprocess_started: bool = False 284 settings = SubprocessWorkerSettings() 285 settings.initiation_function = self.process_initializer 286 settings.working_function = self.process_worker 287 settings.transport = Transport.queue 288 settings.sendable_data_type = SendableDataType.marshalable 289 settings.use_internal_subprocess_input_buffer = True 290 settings.initialization_data = { 291 'filtered_paths': self.filtered_paths 292 } 293 294 self.worker_lock: Lock = Lock(f'CSStatsFormatterMultiprocess.worker__{uuid4()}') 295 self.worker: SubprocessWorker = SubprocessWorker(settings) 296 self.worker.start(wait_for_process_readyness=False) 297 self.worker_is_ready: bool = False 298 299 async def _worker_readyness_waiting_coro(self, i: Interface): 300 need_to_wait = True 301 while need_to_wait: 302 try: 303 async with self.worker_lock: 304 self.worker.wait_for_subprocess_readines(block=False) 305 306 need_to_wait = False 307 except SubprocessIsNotReadyError: 308 await i(Sleep, 0.1) 309 310 self.worker_is_ready = True 311 312 async def wait_for_worker_readyness(self, i: Interface): 313 while not self.worker_is_ready: 314 await i(Sleep, 0.1) 315 316 def start(self, wr: Optional[TkObjWrapper] = None): 317 if wr is None: 318 i = current_interface() 319 i(PutCoro, self._worker_readyness_waiting_coro) 320 else: 321 wr.put_coro(self._worker_readyness_waiting_coro) 322 323 def stop(self): 324 self._stop = True 325 need_to_block = False 326 try: 327 i: Interface = current_interface() 328 except: 329 need_to_block = True 330 331 with self.worker_lock: 332 if need_to_block: 333 self.worker.stop() 334 else: 335 run_in_thread_pool_fast(i, self.worker.stop) 336 337 def update_filtered_paths(self, filtered_paths: List[Sequence[str]]): 338 i: Interface = current_interface() 339 self.filtered_paths = filtered_paths 340 with self.worker_lock: 341 if not self._stop: 342 self.worker.send_data_to_subprocess({ 343 'type': 'filter', 344 'data': self.filtered_paths 345 }) 346 347 def __call__(self, data): 348 with self.worker_lock: 349 if not self.worker_is_ready: 350 # return data 351 return '<<Waiting for subprocess to start...>>' 352 353 i: Interface = current_interface() 354 self.worker.send_data_to_subprocess({ 355 'type': 'data', 356 'data': data 357 }) 358 result = None 359 got_result = False 360 while (not got_result) and (not self._stop): 361 try: 362 result = self.worker.get_answer_from_subprocess(block=False) 363 except Empty: 364 pass 365 else: 366 got_result = True 367 368 if got_result: 369 if 'data' != result['type']: 370 got_result = False 371 372 if not got_result: 373 i(Sleep, 0.01) 374 375 result = '' if result is None else result['data'] 376 result = f'{i.coro_id}. CoroScheduler stats:\n{result}' 377 return result 378 379 @staticmethod 380 def process_initializer(init_data) -> Any: 381 from pprintpp import pformat as pf 382 return init_data 383 384 @staticmethod 385 def process_worker(global_data, data_msg): 386 data = data_msg['data'] 387 if 'filter' == data_msg['type']: 388 if global_data is None: 389 global_data = dict() 390 391 global_data['filtered_paths'] = data 392 return None # no answer to parent process 393 elif 'data' == data_msg['type']: 394 filtered_paths = global_data['filtered_paths'] 395 for path in filtered_paths: 396 try_del_dict_item(data, path) 397 398 data = pf(data, indent=4, width=120) 399 data_msg['data'] = data 400 return data_msg
CSStatsFormatterMultiprocess(filtered_paths: typing.List[typing.Sequence[str]] = None)
273 def __init__(self, filtered_paths: List[Sequence[str]] = None): 274 self.filtered_paths: List[Sequence[str]] = [ 275 ['CoroSchedulerBase', 'loop', 'coroutines execution times'], 276 ['CoroSchedulerBase', 'loop', 'longest continuous execution time of coroutines'], 277 ['CoroSchedulerGreenlet', 'loop', 'coroutines execution times'], 278 ['CoroSchedulerGreenlet', 'loop', 'longest continuous execution time of coroutines'], 279 ['CoroSchedulerAwaitable', 'loop', 'coroutines execution times'], 280 ['CoroSchedulerAwaitable', 'loop', 'longest continuous execution time of coroutines'], 281 ] if filtered_paths is None else filtered_paths 282 self._stop: bool = False 283 self._subprocess_started: bool = False 284 settings = SubprocessWorkerSettings() 285 settings.initiation_function = self.process_initializer 286 settings.working_function = self.process_worker 287 settings.transport = Transport.queue 288 settings.sendable_data_type = SendableDataType.marshalable 289 settings.use_internal_subprocess_input_buffer = True 290 settings.initialization_data = { 291 'filtered_paths': self.filtered_paths 292 } 293 294 self.worker_lock: Lock = Lock(f'CSStatsFormatterMultiprocess.worker__{uuid4()}') 295 self.worker: SubprocessWorker = SubprocessWorker(settings) 296 self.worker.start(wait_for_process_readyness=False) 297 self.worker_is_ready: bool = False
async def
wait_for_worker_readyness( self, i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface):
def
start( self, wr: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.tkinter.versions.v_0.tkinter.TkObjWrapper, NoneType] = None):
def
update_filtered_paths(self, filtered_paths: typing.List[typing.Sequence[str]]):
337 def update_filtered_paths(self, filtered_paths: List[Sequence[str]]): 338 i: Interface = current_interface() 339 self.filtered_paths = filtered_paths 340 with self.worker_lock: 341 if not self._stop: 342 self.worker.send_data_to_subprocess({ 343 'type': 'filter', 344 'data': self.filtered_paths 345 })
@staticmethod
def
process_worker(global_data, data_msg):
384 @staticmethod 385 def process_worker(global_data, data_msg): 386 data = data_msg['data'] 387 if 'filter' == data_msg['type']: 388 if global_data is None: 389 global_data = dict() 390 391 global_data['filtered_paths'] = data 392 return None # no answer to parent process 393 elif 'data' == data_msg['type']: 394 filtered_paths = global_data['filtered_paths'] 395 for path in filtered_paths: 396 try_del_dict_item(data, path) 397 398 data = pf(data, indent=4, width=120) 399 data_msg['data'] = data 400 return data_msg
class
CorosLogsProvider:
516class CorosLogsProvider: 517 def __init__(self, coros_logs_key: Hashable, logs_limit: Union[None, int], period: float, force_stop_timeout: RationalNumber = 0.1): 518 self.coros_logs_key: Hashable = coros_logs_key 519 self.logs_limit: Union[None, int] = logs_limit 520 self.period: float = period 521 self.i: Interface = None 522 self._update_coro_id: CoroID = None 523 self.force_stop_timeout: RationalNumber = force_stop_timeout 524 self.fac = FastAggregatorClient() 525 self._stop = False 526 self.lifetime_stats: Dict[CoroID, float] = dict() 527 self._current_logs_taken: bool = False 528 self._unsend_data: List[List[Tuple[Tuple, Dict]]] = list() 529 self._log_service_handler_was_added: bool = False 530 531 def start(self): 532 if self.i is None: 533 self.i = current_interface() 534 535 self._update_coro_id = self.i(PutCoro, self._update) 536 537 def stop(self): 538 self._stop = True 539 current_interface()(PutCoro, self._force_stop) 540 541 async def _force_stop(self, i: Interface): 542 if self._update_coro_id is not None: 543 update_coro_id = self._update_coro_id 544 self._update_coro_id = None 545 await agraceful_coro_destroyer(i, self.force_stop_timeout, update_coro_id) 546 547 if self._log_service_handler_was_added: 548 self._log_service_handler_was_added = False 549 await i(LogRequest().remove_iteration_handler(self)) 550 551 self._unsend_data = type(self._unsend_data)() 552 553 async def _update(self, i: Interface): 554 while not self._stop: 555 if not self._log_service_handler_was_added: 556 if i._loop.is_service_registered(Log): 557 self._log_service_handler_was_added = True 558 await i(LogRequest().add_iteration_handler(self)) 559 560 logs_parts = self._unsend_data 561 self._unsend_data = type(self._unsend_data)() 562 for logs_part in logs_parts: 563 self.fac(self.coros_logs_key, logs_part) 564 565 await i(Sleep, self.period) 566 567 def __call__(self, log_service: Log, data: List[Tuple[Tuple, Dict, Dict[str, Any]]], current_time: float, current_time_str: str): 568 if self._current_logs_taken: 569 if self.logs_limit is None: 570 if data: 571 self._unsend_data.append(data) 572 else: 573 num_of_an_initial_logs_needed: int = self.logs_limit - len(data) 574 if 0 <= num_of_an_initial_logs_needed: 575 if data: 576 self._unsend_data.append(data) 577 else: 578 part_of_data = data[-self.logs_limit:] 579 if part_of_data: 580 self._unsend_data.append(part_of_data) 581 else: 582 self._current_logs_taken = True 583 if self.logs_limit is None: 584 combined_data = log_service.get_last_n_logs(None) + data 585 if combined_data: 586 self._unsend_data.append(combined_data) 587 else: 588 num_of_an_initial_logs_needed: int = self.logs_limit - len(data) 589 if 0 < num_of_an_initial_logs_needed: 590 combined_data = log_service.get_last_n_logs(num_of_an_initial_logs_needed) + data 591 if combined_data: 592 self._unsend_data.append(combined_data) 593 if 0 == num_of_an_initial_logs_needed: 594 if data: 595 self._unsend_data.append(data) 596 else: 597 part_of_data = data[-self.logs_limit:] 598 if part_of_data: 599 self._unsend_data.append(part_of_data)
CorosLogsProvider( coros_logs_key: typing.Hashable, logs_limit: typing.Union[NoneType, int], period: float, force_stop_timeout: typing.Union[int, float] = 0.1)
517 def __init__(self, coros_logs_key: Hashable, logs_limit: Union[None, int], period: float, force_stop_timeout: RationalNumber = 0.1): 518 self.coros_logs_key: Hashable = coros_logs_key 519 self.logs_limit: Union[None, int] = logs_limit 520 self.period: float = period 521 self.i: Interface = None 522 self._update_coro_id: CoroID = None 523 self.force_stop_timeout: RationalNumber = force_stop_timeout 524 self.fac = FastAggregatorClient() 525 self._stop = False 526 self.lifetime_stats: Dict[CoroID, float] = dict() 527 self._current_logs_taken: bool = False 528 self._unsend_data: List[List[Tuple[Tuple, Dict]]] = list() 529 self._log_service_handler_was_added: bool = False
class
CoroLogsAppendFormatter(cengal.user_interface.gui.tkinter.components.aggregator_view.versions.v_0.aggregator_view.AggregatorAppendFormatter):
463class CoroLogsAppendFormatter(AggregatorAppendFormatter): 464 def __init__(self, initial_text: str, desired_parts: CoroLogsAppendFormatterParts = CoroLogsAppendFormatterParts.all) -> None: 465 super().__init__(initial_text) 466 self._prompt_string: str = f'>>> {"-"*80}' 467 self.desired_parts: CoroLogsAppendFormatterParts = desired_parts 468 469 def __call__(self, data: List[Tuple[Tuple, Dict, Dict[str, Any]]]) -> Any: 470 if data: 471 data_part = '\n\n'.join([self._format(*log_info) for log_info in data]) 472 return super().__call__(f'\n{data_part}\n') 473 else: 474 return super().__call__(str()) 475 476 def _format(self, args, kwargs, info=None) -> str: 477 if info is None: 478 return f'{self._prompt_string}\n{args_kwargs_to_str(args, kwargs)}' 479 else: 480 output_strings: List[str] = list() 481 if CoroLogsAppendFormatterParts.prompt_string in self.desired_parts: 482 output_strings.append(self._prompt_string) 483 484 if CoroLogsAppendFormatterParts.time in self.desired_parts: 485 output_strings.append(f'> Time: {info[InfoFields.current_time]}; Perf Counter: {info[InfoFields.perf_counter_time]:17.6f}') 486 487 if CoroLogsAppendFormatterParts.caller_info in self.desired_parts: 488 output_strings.append(f'> λ: {info[InfoFields.caller_info]}') 489 490 if CoroLogsAppendFormatterParts.logging_level in self.desired_parts: 491 if InfoFields.logging_level in info: 492 output_strings.append(f'> Logging level: {info[InfoFields.logging_level]}') 493 494 if CoroLogsAppendFormatterParts.log in self.desired_parts: 495 output_strings.append(args_kwargs_to_str(args, kwargs)) 496 497 if CoroLogsAppendFormatterParts.coros_traceback in self.desired_parts: 498 coro_parents_strings: List[str] = info[InfoFields.coro_parents_strings] 499 if coro_parents_strings: 500 coro_parents_text: str = '\n'.join(coro_parents_strings) 501 output_strings.append(f'> Coros traceback:\n{coro_parents_text}') 502 503 if CoroLogsAppendFormatterParts.file_name_and_line_number in self.desired_parts: 504 output_strings.append(f'> @ {info[InfoFields.file_name]}:{info[InfoFields.line_number]}') 505 506 if CoroLogsAppendFormatterParts.traceback in self.desired_parts: 507 traceback_strings: List[str] = info[InfoFields.traceback_strings] 508 if traceback_strings: 509 traceback_text: str = '\n'.join(traceback_strings) 510 traceback_text = traceback_text.strip('\n') 511 output_strings.append(f'> Traceback:\n{traceback_text}') 512 513 return '\n'.join(output_strings)
CoroLogsAppendFormatter( initial_text: str, desired_parts: CoroLogsAppendFormatterParts = <CoroLogsAppendFormatterParts.all: 255>)
desired_parts: CoroLogsAppendFormatterParts
Inherited Members
- cengal.user_interface.gui.tkinter.components.aggregator_view.versions.v_0.aggregator_view.AggregatorAppendFormatter
- initial_text
- initiated
- reset
class
CoroLogsAppendFormatterParts(enum.IntFlag):
450class CoroLogsAppendFormatterParts(IntFlag): 451 none = 0 452 prompt_string = auto() 453 time = auto() 454 caller_info = auto() 455 logging_level = auto() 456 log = auto() 457 coros_traceback = auto() 458 file_name_and_line_number = auto() 459 traceback = auto() 460 all = prompt_string | time | caller_info | logging_level | log | coros_traceback | file_name_and_line_number | traceback
An enumeration.
none =
<CoroLogsAppendFormatterParts.none: 0>
prompt_string =
<CoroLogsAppendFormatterParts.prompt_string: 1>
time =
<CoroLogsAppendFormatterParts.time: 2>
caller_info =
<CoroLogsAppendFormatterParts.caller_info: 4>
logging_level =
<CoroLogsAppendFormatterParts.logging_level: 8>
log =
<CoroLogsAppendFormatterParts.log: 16>
coros_traceback =
<CoroLogsAppendFormatterParts.coros_traceback: 32>
file_name_and_line_number =
<CoroLogsAppendFormatterParts.file_name_and_line_number: 64>
traceback =
<CoroLogsAppendFormatterParts.traceback: 128>
all =
<CoroLogsAppendFormatterParts.all: 255>
Inherited Members
- builtins.int
- conjugate
- bit_length
- to_bytes
- from_bytes
- as_integer_ratio
- real
- imag
- numerator
- denominator
- enum.Enum
- name
- value