cengal.parallel_execution.coroutines.coro_tools.loop_administration.admin_tk.versions.v_0.admin_tk

   1#!/usr/bin/env python
   2# coding=utf-8
   3
   4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
   5# 
   6# Licensed under the Apache License, Version 2.0 (the "License");
   7# you may not use this file except in compliance with the License.
   8# You may obtain a copy of the License at
   9# 
  10#     http://www.apache.org/licenses/LICENSE-2.0
  11# 
  12# Unless required by applicable law or agreed to in writing, software
  13# distributed under the License is distributed on an "AS IS" BASIS,
  14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15# See the License for the specific language governing permissions and
  16# limitations under the License.
  17
  18
  19__all__ = ['start_admin', 'cs_init', 'SchedulerPerformanceFormatter', 'scheduler_stats_aggregator_provider', 'CSStatsFormatterMultiprocess',
  20           'CorosLogsProvider', 'CoroLogsAppendFormatter', 'CoroLogsAppendFormatterParts']
  21
  22
  23"""
  24Module Docstring
  25Docstrings: http://www.python.org/dev/peps/pep-0257/
  26"""
  27
  28__author__ = "ButenkoMS <gtalk@butenkoms.space>"
  29__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
  30__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
  31__license__ = "Apache License, Version 2.0"
  32__version__ = "4.4.1"
  33__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
  34__email__ = "gtalk@butenkoms.space"
  35# __status__ = "Prototype"
  36__status__ = "Development"
  37# __status__ = "Production"
  38
  39
  40from cengal.time_management.cpu_clock_cycles import perf_counter
  41import ttkbootstrap as ttkb
  42from ttkbootstrap.scrolled import ScrolledText as TtkbScrolledText
  43from tkinter import simpledialog
  44import tkinter as tk
  45from ttkbootstrap import Style
  46from pprintpp import pformat as pf
  47from typing import Callable, Hashable, Optional, Set, Dict, Tuple, Union, List, Sequence, Any, cast
  48
  49from cengal.parallel_execution.coroutines.coro_scheduler import *
  50from cengal.parallel_execution.coroutines.coro_standard_services.tkinter import *
  51from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro, PutCoroRequest
  52from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import RunCoro
  53from cengal.parallel_execution.coroutines.coro_standard_services.sleep import Sleep
  54from cengal.parallel_execution.coroutines.coro_standard_services.simple_yield import Yield
  55from cengal.parallel_execution.coroutines.coro_standard_services.asyncio_loop import AsyncioLoopRequest, run_in_thread_pool, run_in_thread_pool_fast
  56from cengal.parallel_execution.coroutines.coro_standard_services.fast_aggregator import *
  57from cengal.parallel_execution.coroutines.coro_standard_services.log import *
  58from cengal.parallel_execution.coroutines.coro_standard_services.instance import InstanceRequest
  59from cengal.parallel_execution.coroutines.coro_tools.coro_flow_control import graceful_coro_destroyer, agraceful_coro_destroyer, GracefulCoroDestroy
  60from cengal.parallel_execution.coroutines.coro_tools.lock import Lock
  61from cengal.statistics.normal_distribution import average
  62from cengal.parallel_execution.multiprocess.multiprocessing_task_runner import *
  63from cengal.text_processing.text_processing import normalize_line_separators_and_tabs
  64from cengal.introspection.inspect import get_exception, exception_to_printable_text
  65from cengal.system import OS_TYPE
  66from cengal.data_manipulation.dict_path import *
  67from cengal.user_interface.gui.tkinter.components.read_only_text import *
  68from cengal.user_interface.gui.tkinter.components.tool_tip import *
  69from cengal.user_interface.gui.tkinter.components.aggregator_view import *
  70from cengal.math.numbers import RationalNumber
  71from cengal.code_flow_control.args_manager import args_kwargs_to_str, ArgsKwargs
  72from greenlet import GreenletExit
  73from enum import IntFlag, auto
  74from uuid import uuid4
  75
  76
  77command_executor_aggregator_view_key = 'command_executor_aggregator'
  78command_executor_aggregator_append_view_key = 'command_executor_aggregator_append'
  79
  80coro_holder_header = 'async def coro_holder(i: Interface, fac: FastAggregatorClient, aggregator_key, aggregator_append_key):'
  81coro_holder_template = f'''from cengal.parallel_execution.coroutines.coro_scheduler import *
  82from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import *
  83from cengal.parallel_execution.coroutines.coro_standard_services.put_coro_list import *
  84from cengal.parallel_execution.coroutines.coro_standard_services.kill_coro import *
  85from cengal.parallel_execution.coroutines.coro_standard_services.kill_coro_list import *
  86from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import *
  87from cengal.parallel_execution.coroutines.coro_standard_services.sleep import *
  88from cengal.parallel_execution.coroutines.coro_standard_services.throw_coro import *
  89from cengal.parallel_execution.coroutines.coro_standard_services.throw_coro_list import *
  90from cengal.parallel_execution.coroutines.coro_standard_services.wait_coro import *
  91from cengal.parallel_execution.coroutines.coro_standard_services.shutdown_loop import *
  92from cengal.parallel_execution.coroutines.coro_standard_services.fast_aggregator import *
  93from cengal.parallel_execution.coroutines.coro_standard_services.log import *
  94from cengal.parallel_execution.coroutines.coro_tools.coro_flow_control import graceful_coro_destroyer, GracefulCoroDestroy
  95import asyncio
  96
  97
  98{coro_holder_header}
  99{{coro_body}}
 100
 101
 102result = i(RunCoro, coro_holder, FastAggregatorClient(), aggregator_key, aggregator_append_key)
 103'''
 104
 105
 106def prepare_coro_body(text: str) -> str:
 107    if (not text) or ('\n' == text):
 108        text = 'pass'
 109    
 110    text = normalize_line_separators_and_tabs(text)
 111    text_lines = text.splitlines()
 112    return '\n'.join([' ' * 4 + line for line in text_lines])
 113
 114
 115class CommandExecutor(ttkb.Frame):
 116    def __init__(self, width, height, allow_asyncio: bool = True, priority: Optional[CoroPriority] = None, interrupt_when_no_requests: Optional[bool] = None, *args, **kwargs) -> None:
 117        self.width = width
 118        self.height = height
 119        self.allow_asyncio = allow_asyncio
 120        self.priority = priority
 121        self.interrupt_when_no_requests = interrupt_when_no_requests
 122        self.current_coro_id: CoroID = None
 123        if allow_asyncio:
 124            i = current_interface()
 125            i(AsyncioLoopRequest().ensure_loop(interrupt_when_no_requests=interrupt_when_no_requests, priority=priority))
 126            i(AsyncioLoopRequest().turn_on_loops_intercommunication())
 127
 128        super().__init__(*args, **kwargs)
 129        text_area_config = {
 130            'highlightcolor': Style.instance.colors.primary,
 131            'highlightbackground': Style.instance.colors.border,
 132            'highlightthickness': 1,
 133            'wrap': 'none',
 134        }
 135        if width is not None:
 136            text_area_config.update({
 137                'width': (width - 20) >> 1,
 138            })
 139        
 140        if height is not None:
 141            text_area_config.update({
 142                'height': height,
 143            })
 144        
 145        self.text_in_area = TtkbScrolledText(self, **text_area_config)
 146
 147        if 'Darwin' == OS_TYPE:
 148            self.control_key_name = 'Cmd'
 149        else:
 150            self.control_key_name = 'Ctrl'
 151
 152        self.buttons_frame = ttkb.Frame(self)
 153        self.run_button_text = '>>'
 154        self.run_button_width = len(self.run_button_text) + 1
 155        self.run_button = ttkb.Button(self.buttons_frame, text=self.run_button_text, width=self.run_button_width, command=self.run)
 156        self.run_button_tooltip = ToolTipHovered(self.run_button, f'<{self.control_key_name}+Enter> Run and clear input field. Prevents human missclicks')
 157
 158        self.run_button_2_text = 'R>'
 159        self.run_button_2_width = len(self.run_button_2_text) + 1
 160        self.run_button_2 = ttkb.Button(self.buttons_frame, text=self.run_button_2_text, width=self.run_button_2_width, command=self.run_2)
 161        self.run_button_2_tooltip = ToolTipHovered(self.run_button_2, f'<{self.control_key_name}+Alt+Enter> Run')
 162
 163        self.stop_button_text = 'X'
 164        self.stop_button_width = len(self.stop_button_text) + 1
 165        self.stop_button = ttkb.Button(self.buttons_frame, text=self.stop_button_text, width=self.stop_button_width, command=self.kill)
 166        self.stop_button['state'] = 'disabled'
 167        self.stop_button_tooltip = ToolTipHovered(self.stop_button, f'<{self.control_key_name}+x> Stop execution')
 168
 169        self.text_out_area = ReadOnlyText(self, **text_area_config)
 170
 171
 172        self.func_header_read_only = ttkb.Label(self, text=coro_holder_header, font=('Helvetica', 8, 'bold'))
 173        self.func_header_read_only.pack(fill='x', expand='yes', side='top')
 174
 175        self.text_in_area.pack(fill='both', side='left')
 176        self.run_button.pack(fill='both', side='top', pady=1)
 177        self.run_button_2.pack(fill='both', side='top', pady=1)
 178        self.stop_button.pack(fill='both', side='top', pady=1)
 179        self.buttons_frame.pack(fill='both', side='left')
 180        self.text_out_area.pack(fill='both', expand='yes', side='left')
 181
 182        self.text_in_area._text.bind("<Control-Return>", self.ctrl_enter)
 183        self.text_in_area._text.bind("<Control-Alt-Return>", self.ctrl_alt_enter)
 184        self.text_in_area._text.bind("<Control-x>", self.ctrl_x)
 185
 186    def ctrl_enter(self, event):
 187        self.run()
 188        return 'break'
 189
 190    def ctrl_alt_enter(self, event):
 191        self.run_2()
 192        return 'break'
 193
 194    def ctrl_x(self, event):
 195        self.kill()
 196        return 'break'
 197
 198    def run(self, clear_input: bool = True):
 199        text = self.text_in_area._text.get('1.0', 'end')
 200        if clear_input:
 201            self.text_in_area._text.delete('1.0', 'end')
 202        
 203        self.text_out_area._text.insert('end', text)
 204        
 205        # disable the intput
 206        self.run_button['text'] = '...'
 207        self.run_button_width = len(self.run_button['text']) + 1
 208        self.run_button['state'] = 'disabled'
 209        self.run_button_2['text'] = '...'
 210        self.run_button_2_width = len(self.run_button_2['text']) + 1
 211        self.run_button_2['state'] = 'disabled'
 212        self.stop_button['state'] = 'normal'
 213        self.text_in_area._text['state'] = 'disabled'
 214        self.text_in_area.state(["disabled"])
 215        
 216        self.current_coro_id = current_interface()(PutCoro, self.eval_text, text)
 217    
 218    def run_2(self):
 219        return self.run(clear_input = False)
 220
 221    def kill(self):
 222        if self.current_coro_id is not None:
 223            graceful_coro_destroyer(current_interface(), None, self.current_coro_id, first_phase_is_wait=False)
 224            self.current_coro_id = None
 225            self.run_button['text'] = self.run_button_text
 226            self.run_button_width = self.run_button_width
 227            self.run_button['state'] = 'normal'
 228            self.run_button_2['text'] = self.run_button_2_text
 229            self.run_button_2_width = self.run_button_2_width
 230            self.run_button_2['state'] = 'normal'
 231            self.stop_button['state'] = 'disabled'
 232            self.text_in_area._text['state'] = 'normal'
 233            self.text_in_area.state(["!disabled"])
 234    
 235    def eval_text(self, i: Interface, text):
 236        self.text_out_area._text.delete('1.0', 'end')
 237
 238        # evalueate the text
 239        try:
 240            result = None
 241            local_vars = {
 242                'i': i,
 243                'result': result,
 244                'aggregator_key': command_executor_aggregator_view_key,
 245                'aggregator_append_key': command_executor_aggregator_append_view_key,
 246            }
 247            text = coro_holder_template.format(coro_body=prepare_coro_body(text))
 248            exec(text, local_vars)
 249            self.text_out_area._text.insert('1.0', f'{local_vars["result"]}')
 250        except GracefulCoroDestroy as ex:
 251            # self.text_out_area._text.insert('1.0', f'{type(ex)}')
 252            pass
 253        except GreenletExit as ex:
 254            # self.text_out_area._text.insert('1.0', f'{type(ex)}')
 255            pass
 256        except:
 257            self.text_out_area._text.insert('1.0', f'{exception_to_printable_text(get_exception())}')
 258        finally:
 259            # enable the intput
 260            self.run_button['text'] = self.run_button_text
 261            self.run_button_width = len(self.run_button['text']) + 1
 262            self.run_button['state'] = 'normal'
 263            self.run_button_2['text'] = self.run_button_2_text
 264            self.run_button_2_width = len(self.run_button_2['text']) + 1
 265            self.run_button_2['state'] = 'normal'
 266            self.stop_button['state'] = 'disabled'
 267            self.text_in_area._text['state'] = 'normal'
 268            self.text_in_area.state(["!disabled"])
 269
 270
 271class CSStatsFormatterMultiprocess:
 272    def __init__(self, filtered_paths: List[Sequence[str]] = None):
 273        self.filtered_paths: List[Sequence[str]] = [
 274            ['CoroSchedulerBase', 'loop', 'coroutines execution times'],
 275            ['CoroSchedulerBase', 'loop', 'longest continuous execution time of coroutines'],
 276            ['CoroSchedulerGreenlet', 'loop', 'coroutines execution times'],
 277            ['CoroSchedulerGreenlet', 'loop', 'longest continuous execution time of coroutines'],
 278            ['CoroSchedulerAwaitable', 'loop', 'coroutines execution times'],
 279            ['CoroSchedulerAwaitable', 'loop', 'longest continuous execution time of coroutines'],
 280        ] if filtered_paths is None else filtered_paths
 281        self._stop: bool = False
 282        self._subprocess_started: bool = False
 283        settings = SubprocessWorkerSettings()
 284        settings.initiation_function = self.process_initializer
 285        settings.working_function = self.process_worker
 286        settings.transport = Transport.queue
 287        settings.sendable_data_type = SendableDataType.marshalable
 288        settings.use_internal_subprocess_input_buffer = True
 289        settings.initialization_data = {
 290            'filtered_paths': self.filtered_paths
 291        }
 292        
 293        self.worker_lock: Lock = Lock(f'CSStatsFormatterMultiprocess.worker__{uuid4()}')
 294        self.worker: SubprocessWorker = SubprocessWorker(settings)
 295        self.worker.start(wait_for_process_readyness=False)
 296        self.worker_is_ready: bool = False
 297    
 298    async def _worker_readyness_waiting_coro(self, i: Interface):
 299        need_to_wait = True
 300        while need_to_wait:
 301            try:
 302                async with self.worker_lock:
 303                    self.worker.wait_for_subprocess_readines(block=False)
 304
 305                need_to_wait = False
 306            except SubprocessIsNotReadyError:
 307                await i(Sleep, 0.1)
 308        
 309        self.worker_is_ready = True
 310    
 311    async def wait_for_worker_readyness(self, i: Interface):
 312        while not self.worker_is_ready:
 313            await i(Sleep, 0.1)
 314    
 315    def start(self, wr: Optional[TkObjWrapper] = None):
 316        if wr is None:
 317            i = current_interface()
 318            i(PutCoro, self._worker_readyness_waiting_coro)
 319        else:
 320            wr.put_coro(self._worker_readyness_waiting_coro)
 321    
 322    def stop(self):
 323        self._stop = True
 324        need_to_block = False
 325        try:
 326            i: Interface = current_interface()
 327        except:
 328            need_to_block = True
 329        
 330        with self.worker_lock:
 331            if need_to_block:
 332                self.worker.stop()
 333            else:
 334                run_in_thread_pool_fast(i, self.worker.stop)
 335        
 336    def update_filtered_paths(self, filtered_paths: List[Sequence[str]]):
 337        i: Interface = current_interface()
 338        self.filtered_paths = filtered_paths
 339        with self.worker_lock:
 340            if not self._stop:
 341                self.worker.send_data_to_subprocess({
 342                    'type': 'filter',
 343                    'data': self.filtered_paths
 344                })
 345    
 346    def __call__(self, data):
 347        with self.worker_lock:
 348            if not self.worker_is_ready:
 349                # return data
 350                return '<<Waiting for subprocess to start...>>'
 351            
 352            i: Interface = current_interface()
 353            self.worker.send_data_to_subprocess({
 354                'type': 'data',
 355                'data': data
 356            })
 357            result = None
 358            got_result = False
 359            while (not got_result) and (not self._stop):
 360                try:
 361                    result = self.worker.get_answer_from_subprocess(block=False)
 362                except Empty:
 363                    pass
 364                else:
 365                    got_result = True
 366                
 367                if got_result:
 368                    if 'data' != result['type']:
 369                        got_result = False
 370                
 371                if not got_result:
 372                    i(Sleep, 0.01)
 373            
 374            result = '' if result is None else result['data']
 375            result = f'{i.coro_id}. CoroScheduler stats:\n{result}'
 376            return result
 377    
 378    @staticmethod
 379    def process_initializer(init_data) -> Any:
 380        from pprintpp import pformat as pf
 381        return init_data
 382    
 383    @staticmethod
 384    def process_worker(global_data, data_msg):
 385        data = data_msg['data']
 386        if 'filter' == data_msg['type']:
 387            if global_data is None:
 388                global_data = dict()
 389            
 390            global_data['filtered_paths'] = data
 391            return None  # no answer to parent process
 392        elif 'data' == data_msg['type']:
 393            filtered_paths = global_data['filtered_paths']
 394            for path in filtered_paths:
 395                try_del_dict_item(data, path)
 396            
 397            data = pf(data, indent=4, width=120)
 398            data_msg['data'] = data
 399            return data_msg
 400
 401
 402class CorosMaxExecutionTimesProvider:
 403    def __init__(self, lifetime_stats_key: Hashable, stats_key: Hashable, period: float):
 404        self.lifetime_stats_key: Hashable = lifetime_stats_key
 405        self.stats_key: Hashable = stats_key
 406        self.period: float = period
 407        self.i: Interface = None
 408        self.fac = FastAggregatorClient()
 409        self._stop = False
 410        self.lifetime_stats: Dict[CoroID, float] = dict()
 411    
 412    def start(self):
 413        if self.i is None:
 414            self.i = current_interface()
 415        
 416        self.i(PutCoro, self._update)
 417    
 418    def stop(self):
 419        self._stop = True
 420    
 421    async def _update(self, i: Interface):
 422        while not self._stop:
 423            data = copy(i._loop.coro_longest_execution_time)
 424            new_coro_longest_execution_time = dict()
 425            for key, value in data.items():
 426                new_coro_longest_execution_time[key] = 0.0
 427                
 428                if key not in self.lifetime_stats:
 429                    self.lifetime_stats[key] = 0.0
 430                    
 431                self.lifetime_stats[key] = max(self.lifetime_stats[key], value)
 432            
 433            i._loop.coro_longest_execution_time = new_coro_longest_execution_time
 434            self.fac(self.lifetime_stats_key, self.lifetime_stats)
 435            self.fac(self.stats_key, data)
 436            await i(Sleep, self.period)
 437    
 438    def _formatter(self, title: str, data):
 439        i: Interface = current_interface()
 440        return f'{i.coro_id}. {title}:\n' + '\n'.join([f'{coro_id}: {value * 1000}' for coro_id, value in data.items()])
 441    
 442    def stats_formatter(self, data):
 443        return self._formatter('Coros Max Execution Times', data)
 444    
 445    def lifetime_stats_formatter(self, data):
 446        return self._formatter('Coros Lifetime Max Execution Times', data)
 447
 448
 449class CoroLogsAppendFormatterParts(IntFlag):
 450    none = 0
 451    prompt_string = auto()
 452    time = auto()
 453    caller_info = auto()
 454    logging_level = auto()
 455    log = auto()
 456    coros_traceback = auto()
 457    file_name_and_line_number = auto()
 458    traceback = auto()
 459    all = prompt_string | time | caller_info | logging_level | log | coros_traceback | file_name_and_line_number | traceback
 460
 461
 462class CoroLogsAppendFormatter(AggregatorAppendFormatter):
 463    def __init__(self, initial_text: str, desired_parts: CoroLogsAppendFormatterParts = CoroLogsAppendFormatterParts.all) -> None:
 464        super().__init__(initial_text)
 465        self._prompt_string: str = f'>>> {"-"*80}'
 466        self.desired_parts: CoroLogsAppendFormatterParts = desired_parts
 467
 468    def __call__(self, data: List[Tuple[Tuple, Dict, Dict[str, Any]]]) -> Any:
 469        if data:
 470            data_part = '\n\n'.join([self._format(*log_info) for log_info in data])
 471            return super().__call__(f'\n{data_part}\n')
 472        else:
 473            return super().__call__(str())
 474
 475    def _format(self, args, kwargs, info=None) -> str:
 476        if info is None:
 477            return f'{self._prompt_string}\n{args_kwargs_to_str(args, kwargs)}'
 478        else:
 479            output_strings: List[str] = list()
 480            if CoroLogsAppendFormatterParts.prompt_string in self.desired_parts:
 481                output_strings.append(self._prompt_string)
 482
 483            if CoroLogsAppendFormatterParts.time in self.desired_parts:
 484                output_strings.append(f'> Time: {info[InfoFields.current_time]}; Perf Counter: {info[InfoFields.perf_counter_time]:17.6f}')
 485
 486            if CoroLogsAppendFormatterParts.caller_info in self.desired_parts:
 487                output_strings.append(f'> λ: {info[InfoFields.caller_info]}')
 488
 489            if CoroLogsAppendFormatterParts.logging_level in self.desired_parts:
 490                if InfoFields.logging_level in info:
 491                    output_strings.append(f'> Logging level: {info[InfoFields.logging_level]}')
 492            
 493            if CoroLogsAppendFormatterParts.log in self.desired_parts:
 494                output_strings.append(args_kwargs_to_str(args, kwargs))
 495
 496            if CoroLogsAppendFormatterParts.coros_traceback in self.desired_parts:
 497                coro_parents_strings: List[str] = info[InfoFields.coro_parents_strings]
 498                if coro_parents_strings:
 499                    coro_parents_text: str = '\n'.join(coro_parents_strings)
 500                    output_strings.append(f'> Coros traceback:\n{coro_parents_text}')
 501            
 502            if CoroLogsAppendFormatterParts.file_name_and_line_number in self.desired_parts:
 503                output_strings.append(f'> @ {info[InfoFields.file_name]}:{info[InfoFields.line_number]}')
 504
 505            if CoroLogsAppendFormatterParts.traceback in self.desired_parts:
 506                traceback_strings: List[str] = info[InfoFields.traceback_strings]
 507                if traceback_strings:
 508                    traceback_text: str = '\n'.join(traceback_strings)
 509                    traceback_text = traceback_text.strip('\n')
 510                    output_strings.append(f'> Traceback:\n{traceback_text}')
 511            
 512            return '\n'.join(output_strings)
 513
 514
 515class CorosLogsProvider:
 516    def __init__(self, coros_logs_key: Hashable, logs_limit: Union[None, int], period: float, force_stop_timeout: RationalNumber = 0.1):
 517        self.coros_logs_key: Hashable = coros_logs_key
 518        self.logs_limit: Union[None, int] = logs_limit
 519        self.period: float = period
 520        self.i: Interface = None
 521        self._update_coro_id: CoroID = None
 522        self.force_stop_timeout: RationalNumber = force_stop_timeout
 523        self.fac = FastAggregatorClient()
 524        self._stop = False
 525        self.lifetime_stats: Dict[CoroID, float] = dict()
 526        self._current_logs_taken: bool = False
 527        self._unsend_data: List[List[Tuple[Tuple, Dict]]] = list()
 528        self._log_service_handler_was_added: bool = False
 529    
 530    def start(self):
 531        if self.i is None:
 532            self.i = current_interface()
 533        
 534        self._update_coro_id = self.i(PutCoro, self._update)
 535    
 536    def stop(self):
 537        self._stop = True
 538        current_interface()(PutCoro, self._force_stop)
 539    
 540    async def _force_stop(self, i: Interface):
 541        if self._update_coro_id is not None:
 542            update_coro_id = self._update_coro_id
 543            self._update_coro_id = None
 544            await agraceful_coro_destroyer(i, self.force_stop_timeout, update_coro_id)
 545        
 546        if self._log_service_handler_was_added:
 547            self._log_service_handler_was_added = False
 548            await i(LogRequest().remove_iteration_handler(self))
 549        
 550        self._unsend_data = type(self._unsend_data)()
 551    
 552    async def _update(self, i: Interface):
 553        while not self._stop:
 554            if not self._log_service_handler_was_added:
 555                if i._loop.is_service_registered(Log):
 556                    self._log_service_handler_was_added = True
 557                    await i(LogRequest().add_iteration_handler(self))
 558                    
 559            logs_parts = self._unsend_data
 560            self._unsend_data = type(self._unsend_data)()
 561            for logs_part in logs_parts:
 562                self.fac(self.coros_logs_key, logs_part)
 563            
 564            await i(Sleep, self.period)
 565    
 566    def __call__(self, log_service: Log, data: List[Tuple[Tuple, Dict, Dict[str, Any]]], current_time: float, current_time_str: str):
 567        if self._current_logs_taken:
 568            if self.logs_limit is None:
 569                if data:
 570                    self._unsend_data.append(data)
 571            else:
 572                num_of_an_initial_logs_needed: int = self.logs_limit - len(data)
 573                if 0 <= num_of_an_initial_logs_needed:
 574                    if data:
 575                        self._unsend_data.append(data)
 576                else:
 577                    part_of_data = data[-self.logs_limit:]
 578                    if part_of_data:
 579                        self._unsend_data.append(part_of_data)
 580        else:
 581            self._current_logs_taken = True
 582            if self.logs_limit is None:
 583                combined_data = log_service.get_last_n_logs(None) + data
 584                if combined_data:
 585                    self._unsend_data.append(combined_data)
 586            else:
 587                num_of_an_initial_logs_needed: int = self.logs_limit - len(data)
 588                if 0 < num_of_an_initial_logs_needed:
 589                    combined_data = log_service.get_last_n_logs(num_of_an_initial_logs_needed) + data
 590                    if combined_data:
 591                        self._unsend_data.append(combined_data)
 592                if 0 == num_of_an_initial_logs_needed:
 593                    if data:
 594                        self._unsend_data.append(data)
 595                else:
 596                    part_of_data = data[-self.logs_limit:]
 597                    if part_of_data:
 598                        self._unsend_data.append(part_of_data)
 599
 600
 601class SchedulerPerformanceFormatter:
 602    def __init__(self, loop_iteration_delta_times_key: Hashable, loop_iteration_delta_times_lifetime_stats_key: Hashable, loop_iteration_delta_times_stats_key: Hashable, window_size: int):
 603        self.external_items_key: Hashable = loop_iteration_delta_times_key
 604        self.internal_items_key: Hashable = f'internal_scheduler_tdelta__{uuid4()}'
 605        self.lifetime_stats_key: Hashable = loop_iteration_delta_times_lifetime_stats_key
 606        self.stats_key: Hashable = loop_iteration_delta_times_stats_key
 607        self.window_size: int = window_size
 608        self.fac = FastAggregatorClient()
 609        self.i: Interface = None
 610        self.data_window: List = list()
 611        self.max_deviation: float = None
 612        self.min_deviation: float = None
 613        self.max_iter_per_sec: float = None
 614        self.min_iter_per_sec: float = None
 615        self._stop: bool = False
 616
 617        settings = SubprocessWorkerSettings()
 618        settings.initiation_function = self.process_initializer
 619        settings.working_function = self.process_worker
 620        settings.transport = Transport.queue
 621        settings.sendable_data_type = SendableDataType.marshalable
 622        self.worker_lock: Lock = Lock(f'SchedulerPerformanceFormatter.worker__{uuid4()}')
 623        self.worker: SubprocessWorker = SubprocessWorker(settings)
 624        self.worker.start(wait_for_process_readyness=False)
 625        self.worker_is_ready: bool = False
 626    
 627    async def _worker_readyness_waiting_coro(self, i: Interface):
 628        need_to_wait = True
 629        while need_to_wait:
 630            try:
 631                async with self.worker_lock:
 632                    self.worker.wait_for_subprocess_readines(block=False)
 633                
 634                need_to_wait = False
 635            except SubprocessIsNotReadyError:
 636                await i(Sleep, 0.1)
 637        
 638        self.worker_is_ready = True
 639    
 640    async def wait_for_worker_readyness(self, i: Interface):
 641        while not self.worker_is_ready:
 642            await i(Sleep, 0.1)
 643    
 644    def start(self, wr: Optional[TkObjWrapper] = None):
 645        if self.i is None:
 646            self.i = current_interface()
 647        
 648        if wr is None:
 649            self.i(PutCoro, self._worker_readyness_waiting_coro)
 650            self.i(PutCoro, self._update)
 651            self.i(PutCoro, self._update_stats)
 652        else:
 653            wr.put_coro(self._worker_readyness_waiting_coro)
 654            wr.put_coro(self._update)
 655            wr.put_coro(self._update_stats)
 656    
 657    def stop(self):
 658        self._stop = True
 659        need_to_block = False
 660        try:
 661            i: Interface = current_interface()
 662        except:
 663            need_to_block = True
 664        
 665        with self.worker_lock:
 666            if need_to_block:
 667                self.worker.stop()
 668            else:
 669                run_in_thread_pool_fast(i, self.worker.stop)
 670    
 671    def _update(self, i: Interface):
 672        i(RunCoro, self.wait_for_worker_readyness)
 673        while not self._stop:
 674            # start = perf_counter()
 675            i(Yield)
 676            # stop = perf_counter()
 677            # delta_time = stop - start
 678            loop: CoroSchedulerType = i._loop
 679            delta_time = loop.loop_iteration_delta_time
 680            self.fac(self.external_items_key, delta_time)
 681            self.fac(self.internal_items_key, delta_time)
 682
 683    def _update_stats(self, i: Interface):
 684        i(RunCoro, self.wait_for_worker_readyness)
 685        while not self._stop:
 686            try:
 687                data = i(FastAggregator, self.internal_items_key)
 688                if len(data) >= self.window_size:
 689                    self.data_window = data[-self.window_size:]
 690                else:
 691                    additional_data_len = self.window_size - len(data)
 692                    if len(self.data_window) >= additional_data_len:
 693                        self.data_window = self.data_window[-additional_data_len:] + data
 694                    else:
 695                        self.data_window.extend(data)
 696                
 697                iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation = self.compute_stats(i, self.data_window)
 698                if self.max_deviation is None:
 699                    self.max_deviation = max_deviation
 700                else:
 701                    self.max_deviation = max(self.max_deviation, max_deviation)
 702                
 703                if self.min_deviation is None:
 704                    self.min_deviation = min_deviation
 705                else:
 706                    self.min_deviation = min(self.min_deviation, min_deviation)
 707                
 708                if self.max_iter_per_sec is None:
 709                    self.max_iter_per_sec = iter_per_sec
 710                else:
 711                    self.max_iter_per_sec = max(self.max_iter_per_sec, iter_per_sec)
 712                
 713                if self.min_iter_per_sec is None:
 714                    self.min_iter_per_sec = iter_per_sec
 715                else:
 716                    self.min_iter_per_sec = min(self.min_iter_per_sec, iter_per_sec)
 717                
 718                self.fac(self.stats_key, (iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation))
 719                self.fac(self.lifetime_stats_key, (self.max_iter_per_sec, self.min_iter_per_sec, self.max_deviation, self.min_deviation))
 720                
 721                self.window_size = 2 * int(round(iter_per_sec))
 722            except KeyError:
 723                pass
 724            
 725            i(Sleep, 0.2)
 726    
 727    def compute_stats(self, i: Interface, data):
 728        with self.worker_lock:
 729            self.worker.send_data_to_subprocess(data)
 730            result = None
 731            got_result = False
 732            while not got_result:
 733                try:
 734                    result = self.worker.get_answer_from_subprocess(block=False)
 735                except Empty:
 736                    pass
 737                else:
 738                    got_result = True
 739                
 740                if not got_result:
 741                    i(Sleep, 0.005)
 742                
 743            return result
 744    
 745    @staticmethod
 746    def process_initializer(init_data):
 747        pass
 748    
 749    @staticmethod
 750    def process_worker(global_data, data):
 751        from cengal.statistics.normal_distribution import count_99_95_68
 752        val_99, val_95, val_68, max_deviation, min_deviation = count_99_95_68(data)
 753        average_data = average(data)
 754        iter_per_sec = 0 if 0 == average_data else (1 / average_data)
 755        return (iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation)
 756        
 757    @staticmethod
 758    def item_formatter(data) -> str:
 759        return f'{data}'
 760        
 761    @staticmethod
 762    def stats_formatter(data) -> str:
 763        i: Interface = current_interface()
 764        iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation = data
 765        return f'{i.coro_id}. Stats:\niter_per_sec: {iter_per_sec}\nval_99: {val_99}\nval_95: {val_95}\nval_68: {val_68}\nmax_deviation: {max_deviation}\nmin_deviation: {min_deviation}'
 766        
 767    @staticmethod
 768    def lifetime_stats_formatter(data) -> str:
 769        i: Interface = current_interface()
 770        max_iter_per_sec, min_iter_per_sec, max_deviation, min_deviation = data
 771        return f'{i.coro_id}. Lifetime Stats:\nmax_iter_per_sec: {max_iter_per_sec}\nmin_iter_per_sec: {min_iter_per_sec}\nmax_deviation: {max_deviation}\nmin_deviation: {min_deviation}'
 772
 773
 774class HelpDialog(simpledialog.Dialog):
 775    def body(self, master):
 776        self.title('Console Description and Help')
 777
 778        text_area_config = {
 779            'highlightcolor': Style.instance.colors.primary,
 780            'highlightbackground': Style.instance.colors.border,
 781            'highlightthickness': 1,
 782            'wrap': 'none',
 783            'width': 120,
 784            'height': 30,
 785        }
 786
 787        self.text_area = ReadOnlyText(self, **text_area_config)
 788        self.text_area.pack(fill='both', expand='yes')
 789        self.text_area._text.delete('1.0', 'end')
 790        self.text_area._text.insert('end', coro_holder_template)
 791        
 792        return self.text_area  # initial focus
 793
 794
 795class FilterSetupDialog(ttkb.Toplevel):
 796    def __init__(self, parent, filtered_paths):
 797        # position is centered to parent
 798        parent_pos_left, parent_pos_top = parent.winfo_x(), parent.winfo_y()
 799        parent_size_x, parent_size_y = parent.winfo_width(), parent.winfo_height()
 800        own_size_x, own_size_y = 600, 400
 801        own_pos_x = parent_pos_left + (parent_size_x - own_size_x) // 2
 802        own_pos_y = parent_pos_top + (parent_size_y - own_size_y) // 2
 803        super().__init__(parent, size=(own_size_x, own_size_y), resizable=(True, True), position=(own_pos_x, own_pos_y))
 804        self.filtered_paths = filtered_paths
 805        self.title('Filtered paths setup')
 806        self.transient(parent)
 807        self.result = None
 808
 809        ttkb.Label(self, text="Enter your text:").pack(fill=None, expand='no', side='top')
 810        self.text_entry = TtkbScrolledText(self, width=30, height=10)
 811        self.text_entry.pack(fill='both', expand='yes', side='top')
 812
 813        button_frame = ttkb.Frame(self)
 814        button_frame.pack(fill='x', expand='no', side='right')
 815
 816        ttkb.Button(button_frame, text='OK', command=self.ok).pack(side=ttkb.LEFT, padx=1)
 817        ttkb.Button(button_frame, text='Cancel', command=self.cancel).pack(side=ttkb.LEFT, padx=1)
 818
 819        self.protocol("WM_DELETE_WINDOW", self.cancel)
 820        self.text_entry.focus_set()
 821        filtered_paths_list = list()
 822        for path in self.filtered_paths:
 823            filtered_paths_list.append(dict_path_to_str(path))
 824        
 825        filtered_paths_text = '\n'.join(filtered_paths_list) + '\n'
 826        self.text_entry._text.insert('1.0', filtered_paths_text)
 827
 828        self.text_entry._text.bind("<Control-Return>", self.ctrl_enter)
 829        self.text_entry._text.bind("<Escape>", self.escape)
 830        self.text_entry._text.focus_set()
 831
 832    def ctrl_enter(self, event):
 833        self.ok(event)
 834        return 'break'
 835
 836    def escape(self, event):
 837        self.cancel(event)
 838        return 'break'
 839
 840    def ok(self, event=None):
 841        filtered_paths_text = self.text_entry.get('1.0', 'end-1c')
 842        result_lines = filtered_paths_text.split('\n')
 843        filtered_paths = list()
 844        for line in result_lines:
 845            if line:
 846                filtered_paths.append(srt_to_dict_path(line))
 847        
 848        self.result = filtered_paths
 849        self.destroy()
 850
 851    def cancel(self, event=None):
 852        self.result = self.filtered_paths
 853        self.destroy()
 854
 855
 856# class Application(Tk):
 857class Application(ttkb.Window):
 858    def __init__(self, style: str = 'superhero', filtered_paths: List[List[str]] = None, current_children_pack_type: int = 1):
 859        super().__init__(size=(1900, 900), resizable=(True, True))
 860        self.style_name = style
 861        Style(style)
 862        self.current_children_pack_type: int = current_children_pack_type
 863        self.max_children_pack_type: int = 2
 864        self.packed_widgets: List[tk.Widget] = list()
 865        self.filtered_paths: List[Sequence[str]] = filtered_paths
 866
 867        self.title('CoroScheduler Admin')
 868        self.resizable(width=True, height=True)
 869
 870        self.sfmp = CSStatsFormatterMultiprocess(self.filtered_paths)
 871        
 872        self.scheduler_stats_frame = ttkb.Frame(self)
 873        self.scheduler_stats_control_frame = ttkb.Frame(self.scheduler_stats_frame)
 874        
 875        self.scheduler_stats_layout_button_text = self.scheduler_stats_layout_button_name()
 876        self.scheduler_stats_layout_button_text_len = len('L') + 1
 877        self.scheduler_stats_layout_button = ttkb.Button(self.scheduler_stats_control_frame, text=self.scheduler_stats_layout_button_text, width=self.scheduler_stats_layout_button_text_len, command=self.scheduler_stats_layout_button_on_click)
 878        self.scheduler_stats_layout_button_tooltip = ToolTipHovered(self.scheduler_stats_layout_button, 'Layout circle switch')
 879
 880        self.scheduler_stats_format_button_text = 'F'
 881        self.scheduler_stats_format_button_text_len = len(self.scheduler_stats_format_button_text) + 1
 882        self.scheduler_stats_format_button = ttkb.Button(self.scheduler_stats_control_frame, text=self.scheduler_stats_format_button_text, width=self.scheduler_stats_format_button_text_len, command=self.scheduler_stats_format_button_on_click)
 883        self.scheduler_stats_format_button_tooltip = ToolTipHovered(self.scheduler_stats_format_button, 'Filter our unnecessary or flickering paths')
 884        
 885        self.scheduler_stats_help_button_text = '?'
 886        self.scheduler_stats_help_button_text_len = len(self.scheduler_stats_help_button_text) + 1
 887        self.scheduler_stats_help_button = ttkb.Button(self.scheduler_stats_control_frame, text=self.scheduler_stats_help_button_text, width=self.scheduler_stats_help_button_text_len, bootstyle="info", command=self.scheduler_stats_help_button_on_click)
 888        self.scheduler_stats_help_button_tooltip = ToolTipHovered(self.scheduler_stats_help_button, 'Console description and help')
 889
 890        self.coro_scheduler_view = AggregatorView(False, False, 'coro_scheduler_stats', 1, self.sfmp, 25, 23, self.scheduler_stats_frame)
 891
 892        self.coro_logs_formatter: CoroLogsAppendFormatter = CoroLogsAppendFormatter('Coro Logs')
 893        self.coro_logs_provider: CorosLogsProvider = CorosLogsProvider('coro scheduler logs', None, 0.25)
 894        self.coro_scheduler_logs_view = AggregatorView(True, True, 'coro scheduler logs', 1, self.coro_logs_formatter, 25, 23, self.scheduler_stats_frame)
 895
 896        self.command_executor_view = CommandExecutor(160, 6)
 897        
 898        self.cmet = CorosMaxExecutionTimesProvider('coros lifetime max execution times', 'coros max execution times', 0.5)
 899        
 900        self.coros_lifetime_max_execution_times = AggregatorView(False, False, 'coros lifetime max execution times', 0.75, self.cmet.lifetime_stats_formatter, 25, 13, self)
 901        
 902        self.coros_max_execution_times = AggregatorView(False, False, 'coros max execution times', 0.75, self.cmet.stats_formatter, 25, 13, self)
 903        
 904        self.spf = SchedulerPerformanceFormatter('loop_iteration_delta_times', 'loop_iteration_delta_times_lifetime_stats', 'loop_iteration_delta_times_stats', 5000)
 905        
 906        self.scheduler_tdelta_formatter = AggregatorAppendFormatter('Scheduler TDelta')
 907        self.scheduler_tdelta = AggregatorView(True, False, 'loop_iteration_delta_times', 0.1, self.scheduler_tdelta_formatter, 22, 13, self)
 908        self.scheduler_tdelta.default_auto_scroll = False
 909        self.scheduler_tdelta.max_len = 1000
 910        
 911        def aggregator_view_formatter(data: Any) -> str:
 912            return f'{current_interface().coro_id}. Aggregator:\n{data}'
 913        
 914        self.command_executor_aggregator_view = AggregatorView(False, False, command_executor_aggregator_view_key, 1, aggregator_view_formatter, 25, 13, self)
 915        FastAggregatorClient()(command_executor_aggregator_view_key, str())
 916        
 917        self.command_executor_aggregator_append_formatter = AggregatorAppendFormatter('Aggregator Append')
 918        self.command_executor_aggregator_append_view = AggregatorView(True, True, command_executor_aggregator_append_view_key, 1, self.command_executor_aggregator_append_formatter, 25, 13, self)
 919        self.command_executor_aggregator_append_view.max_len = 5000
 920        FastAggregatorClient()(command_executor_aggregator_append_view_key, str())
 921
 922        self.scheduler_lifetime_stats = AggregatorView(False, False, 'loop_iteration_delta_times_lifetime_stats', 0.3, self.spf.lifetime_stats_formatter, 36, 5, self)
 923        
 924        self.scheduler_stats = AggregatorView(False, False, 'loop_iteration_delta_times_stats', 0.3, self.spf.stats_formatter, 36, 7, self)
 925
 926        self.pack_children(self.current_children_pack_type)
 927
 928        # set window size taking into account the size of the widgets
 929        self.update()
 930        width = self.winfo_width()
 931        height = self.winfo_height()
 932        x = (self.winfo_screenwidth() // 2) - (width // 2)
 933        y = (self.winfo_screenheight() // 2) - (height // 2)
 934        self.geometry('{}x{}+{}+{}'.format(width, height, x, y))
 935        self.update()
 936    
 937    def pack_forget_children(self):
 938        self.packed_widgets.reverse()
 939        for widget in self.packed_widgets:
 940            widget.pack_forget()
 941    
 942    def pack_children(self, pack_type: int):
 943        self.pack_forget_children()
 944        self.packed_widgets.clear()
 945        if 0 == pack_type:
 946            self.scheduler_stats_layout_button.pack(fill=None, expand='no', side='top', pady=1)
 947            self.packed_widgets.append(self.scheduler_stats_layout_button)
 948            self.scheduler_stats_format_button.pack(fill=None, expand='no', side='top', pady=1)
 949            self.packed_widgets.append(self.scheduler_stats_format_button)
 950            self.scheduler_stats_help_button.pack(fill=None, expand='no', side='top', pady=1)
 951            self.packed_widgets.append(self.scheduler_stats_help_button)
 952            self.scheduler_stats_control_frame.pack(fill='y', expand='no', side='left')
 953            self.packed_widgets.append(self.scheduler_stats_control_frame)
 954            self.coro_scheduler_view.pack(fill='both', expand='yes', side='left')
 955            self.packed_widgets.append(self.coro_scheduler_view)
 956            self.coro_scheduler_logs_view.pack(fill='both', expand='no', side='left')
 957            self.packed_widgets.append(self.coro_scheduler_logs_view)
 958
 959            self.scheduler_stats_frame.pack(fill='both', expand='yes', side='bottom')
 960            self.packed_widgets.append(self.scheduler_stats_frame)
 961            self.command_executor_view.pack(fill='x', expand='no', side='bottom')
 962            self.packed_widgets.append(self.command_executor_view)
 963            self.coros_lifetime_max_execution_times.pack(fill='x', expand='no', side='left')
 964            self.packed_widgets.append(self.coros_lifetime_max_execution_times)
 965            self.coros_max_execution_times.pack(fill='x', expand='no', side='left')
 966            self.packed_widgets.append(self.coros_max_execution_times)
 967            self.scheduler_tdelta.pack(fill='x', expand='no', side='left')
 968            self.packed_widgets.append(self.scheduler_tdelta)
 969            self.command_executor_aggregator_view.pack(fill='x', expand='yes', side='right')
 970            self.packed_widgets.append(self.command_executor_aggregator_view)
 971            self.command_executor_aggregator_append_view.pack(fill='x', expand='yes', side='right')
 972            self.packed_widgets.append(self.command_executor_aggregator_append_view)
 973            self.scheduler_lifetime_stats.pack(fill=None, expand='no', side='top')
 974            self.packed_widgets.append(self.scheduler_lifetime_stats)
 975            self.scheduler_stats.pack(fill=None, expand='no', side='bottom')
 976            self.packed_widgets.append(self.scheduler_stats)
 977        elif 1 == pack_type:
 978            self.scheduler_stats_layout_button.pack(fill=None, expand='no', side='top', pady=1)
 979            self.packed_widgets.append(self.scheduler_stats_layout_button)
 980            self.scheduler_stats_format_button.pack(fill=None, expand='no', side='top', pady=1)
 981            self.packed_widgets.append(self.scheduler_stats_format_button)
 982            self.scheduler_stats_help_button.pack(fill=None, expand='no', side='top', pady=1)
 983            self.packed_widgets.append(self.scheduler_stats_help_button)
 984            self.scheduler_stats_control_frame.pack(fill='y', expand='no', side='left')
 985            self.packed_widgets.append(self.scheduler_stats_control_frame)
 986            self.coro_scheduler_view.pack(fill='both', expand='yes', side='left')
 987            self.packed_widgets.append(self.coro_scheduler_view)
 988            self.coro_scheduler_logs_view.pack(fill='both', expand='yes', side='left')
 989            self.packed_widgets.append(self.coro_scheduler_logs_view)
 990
 991            self.scheduler_stats_frame.pack(fill='both', expand='yes', side='bottom')
 992            self.packed_widgets.append(self.scheduler_stats_frame)
 993            self.command_executor_view.pack(fill='x', expand='no', side='bottom')
 994            self.packed_widgets.append(self.command_executor_view)
 995            self.coros_lifetime_max_execution_times.pack(fill='x', expand='no', side='left')
 996            self.packed_widgets.append(self.coros_lifetime_max_execution_times)
 997            self.coros_max_execution_times.pack(fill='x', expand='no', side='left')
 998            self.packed_widgets.append(self.coros_max_execution_times)
 999            self.scheduler_tdelta.pack(fill='x', expand='no', side='left')
1000            self.packed_widgets.append(self.scheduler_tdelta)
1001            self.command_executor_aggregator_view.pack(fill='x', expand='yes', side='right')
1002            self.packed_widgets.append(self.command_executor_aggregator_view)
1003            self.command_executor_aggregator_append_view.pack(fill='x', expand='yes', side='right')
1004            self.packed_widgets.append(self.command_executor_aggregator_append_view)
1005            self.scheduler_lifetime_stats.pack(fill=None, expand='no', side='top')
1006            self.packed_widgets.append(self.scheduler_lifetime_stats)
1007            self.scheduler_stats.pack(fill=None, expand='no', side='bottom')
1008            self.packed_widgets.append(self.scheduler_stats)
1009        elif 2 == pack_type:
1010            self.scheduler_stats_layout_button.pack(fill=None, expand='no', side='top', pady=1)
1011            self.packed_widgets.append(self.scheduler_stats_layout_button)
1012            self.scheduler_stats_format_button.pack(fill=None, expand='no', side='top', pady=1)
1013            self.packed_widgets.append(self.scheduler_stats_format_button)
1014            self.scheduler_stats_help_button.pack(fill=None, expand='no', side='top', pady=1)
1015            self.packed_widgets.append(self.scheduler_stats_help_button)
1016            self.scheduler_stats_control_frame.pack(fill='y', expand='no', side='left')
1017            self.packed_widgets.append(self.scheduler_stats_control_frame)
1018            self.coro_scheduler_view.pack(fill='both', expand='no', side='left')
1019            self.packed_widgets.append(self.coro_scheduler_view)
1020            self.coro_scheduler_logs_view.pack(fill='both', expand='yes', side='left')
1021            self.packed_widgets.append(self.coro_scheduler_logs_view)
1022
1023            self.scheduler_stats_frame.pack(fill='both', expand='yes', side='bottom')
1024            self.packed_widgets.append(self.scheduler_stats_frame)
1025            self.command_executor_view.pack(fill='x', expand='no', side='bottom')
1026            self.packed_widgets.append(self.command_executor_view)
1027            self.coros_lifetime_max_execution_times.pack(fill='x', expand='no', side='left')
1028            self.packed_widgets.append(self.coros_lifetime_max_execution_times)
1029            self.coros_max_execution_times.pack(fill='x', expand='no', side='left')
1030            self.packed_widgets.append(self.coros_max_execution_times)
1031            self.scheduler_tdelta.pack(fill='x', expand='no', side='left')
1032            self.packed_widgets.append(self.scheduler_tdelta)
1033            self.command_executor_aggregator_view.pack(fill='x', expand='yes', side='right')
1034            self.packed_widgets.append(self.command_executor_aggregator_view)
1035            self.command_executor_aggregator_append_view.pack(fill='x', expand='yes', side='right')
1036            self.packed_widgets.append(self.command_executor_aggregator_append_view)
1037            self.scheduler_lifetime_stats.pack(fill=None, expand='no', side='top')
1038            self.packed_widgets.append(self.scheduler_lifetime_stats)
1039            self.scheduler_stats.pack(fill=None, expand='no', side='bottom')
1040            self.packed_widgets.append(self.scheduler_stats)
1041        else:
1042            raise NotImplementedError
1043    
1044    def scheduler_stats_layout_button_name(self) -> str:
1045        return 'L' if 0 == self.current_children_pack_type else 'L' + str(self.current_children_pack_type)
1046
1047    def scheduler_stats_layout_button_on_click(self):
1048        self.current_children_pack_type += 1
1049        if self.current_children_pack_type > self.max_children_pack_type:
1050            self.current_children_pack_type = 0
1051        
1052        self.scheduler_stats_layout_button_text = self.scheduler_stats_layout_button_name()
1053        self.scheduler_stats_layout_button.config(text=self.scheduler_stats_layout_button_text)
1054        self.pack_children(self.current_children_pack_type)
1055    
1056    def scheduler_stats_format_button_on_click(self):
1057        d = FilterSetupDialog(self, self.sfmp.filtered_paths)
1058        self.wait_window(d)
1059        self.sfmp.update_filtered_paths(d.result)
1060    
1061    def scheduler_stats_help_button_on_click(self):
1062        HelpDialog(self)
1063
1064    def start(self, wr: TkObjWrapper):
1065        self.spf.start()
1066        self.sfmp.start(wr)
1067        self.coro_logs_provider.start()
1068        self.scheduler_tdelta.start(wr)
1069        self.command_executor_aggregator_view.start(wr)
1070        self.command_executor_aggregator_append_view.start(wr)
1071        self.scheduler_lifetime_stats.start(wr)
1072        self.scheduler_stats.start(wr)
1073        self.coro_scheduler_view.start(wr)
1074        self.coro_scheduler_logs_view.start(wr)
1075        self.cmet.start()
1076        self.coros_lifetime_max_execution_times.start(wr)
1077        self.coros_max_execution_times.start(wr)
1078    
1079    def stop(self):
1080        self.spf.stop()
1081        self.sfmp.stop()
1082        self.coro_logs_provider.stop()
1083        self.scheduler_tdelta.stop()
1084        self.command_executor_aggregator_view.stop()
1085        self.command_executor_aggregator_append_view.stop()
1086        self.scheduler_lifetime_stats.stop()
1087        self.scheduler_stats.stop()
1088        self.coro_scheduler_view.stop()
1089        self.coro_scheduler_logs_view.stop()
1090        self.cmet.stop()
1091        self.coros_lifetime_max_execution_times.stop()
1092        self.coros_max_execution_times.stop()
1093    
1094    def prepare(self, i: Interface, wr: TkObjWrapper):
1095        i(Yield)
1096        self.start(wr)
1097        i(Yield)
1098
1099
1100def coro_scheduler_admin__view(i: Interface, on_close: Optional[AnyWorker] = None, app_args_kwargs: Optional[ArgsKwargs] = None):
1101    app_args, app_kwargs = app_args_kwargs() if app_args_kwargs is not None else ArgsKwargs()()
1102    with TkinterContextManager(i, Application(*app_args, **app_kwargs)) as wr:
1103        app: Application = cast(Application, wr.tk)
1104        app.prepare(i, wr)
1105        i(InstanceRequest().set('admin_tk_app', app))
1106    
1107    if on_close is not None:
1108        app.stop()
1109        i(RunCoro, on_close)
1110
1111
1112def scheduler_stats_aggregator_provider(i: Interface, fac_key: str = 'coro_scheduler_stats'):
1113    cs: CoroSchedulerType = i._loop
1114    fac = FastAggregatorClient()
1115    while True:
1116        stats_level: EntityStatsMixin.StatsLevel = EntityStatsMixin.StatsLevel.info
1117        result = dict()
1118        name, stats = cs.get_entity_stats(stats_level)
1119        result[name] = stats
1120        fac(fac_key, result)
1121        i(Sleep, 0.5)
1122
1123
1124def cs_init(cs: CoroSchedulerType):
1125    cs.set_coro_time_measurement(True)
1126    cs.set_coro_history_gathering(True)
1127    cs.set_loop_iteration_time_measurement(True)
1128
1129
1130async def start_admin(i: Interface, on_close: Optional[AnyWorker] = None, app_args_kwargs: Optional[ArgsKwargs] = None):
1131    cs_init(i._loop)
1132    await i(PutCoroRequest().turn_on_tree_monitoring(True))
1133    await i(PutCoro, scheduler_stats_aggregator_provider)
1134    await i(PutCoro, coro_scheduler_admin__view, on_close, app_args_kwargs)
async def start_admin( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, on_close: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]], NoneType] = None, app_args_kwargs: typing.Union[cengal.code_flow_control.args_manager.versions.v_0.args_manager.ArgsKwargs, NoneType] = None):
1131async def start_admin(i: Interface, on_close: Optional[AnyWorker] = None, app_args_kwargs: Optional[ArgsKwargs] = None):
1132    cs_init(i._loop)
1133    await i(PutCoroRequest().turn_on_tree_monitoring(True))
1134    await i(PutCoro, scheduler_stats_aggregator_provider)
1135    await i(PutCoro, coro_scheduler_admin__view, on_close, app_args_kwargs)
def cs_init( cs: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable]):
1125def cs_init(cs: CoroSchedulerType):
1126    cs.set_coro_time_measurement(True)
1127    cs.set_coro_history_gathering(True)
1128    cs.set_loop_iteration_time_measurement(True)
class SchedulerPerformanceFormatter:
602class SchedulerPerformanceFormatter:
603    def __init__(self, loop_iteration_delta_times_key: Hashable, loop_iteration_delta_times_lifetime_stats_key: Hashable, loop_iteration_delta_times_stats_key: Hashable, window_size: int):
604        self.external_items_key: Hashable = loop_iteration_delta_times_key
605        self.internal_items_key: Hashable = f'internal_scheduler_tdelta__{uuid4()}'
606        self.lifetime_stats_key: Hashable = loop_iteration_delta_times_lifetime_stats_key
607        self.stats_key: Hashable = loop_iteration_delta_times_stats_key
608        self.window_size: int = window_size
609        self.fac = FastAggregatorClient()
610        self.i: Interface = None
611        self.data_window: List = list()
612        self.max_deviation: float = None
613        self.min_deviation: float = None
614        self.max_iter_per_sec: float = None
615        self.min_iter_per_sec: float = None
616        self._stop: bool = False
617
618        settings = SubprocessWorkerSettings()
619        settings.initiation_function = self.process_initializer
620        settings.working_function = self.process_worker
621        settings.transport = Transport.queue
622        settings.sendable_data_type = SendableDataType.marshalable
623        self.worker_lock: Lock = Lock(f'SchedulerPerformanceFormatter.worker__{uuid4()}')
624        self.worker: SubprocessWorker = SubprocessWorker(settings)
625        self.worker.start(wait_for_process_readyness=False)
626        self.worker_is_ready: bool = False
627    
628    async def _worker_readyness_waiting_coro(self, i: Interface):
629        need_to_wait = True
630        while need_to_wait:
631            try:
632                async with self.worker_lock:
633                    self.worker.wait_for_subprocess_readines(block=False)
634                
635                need_to_wait = False
636            except SubprocessIsNotReadyError:
637                await i(Sleep, 0.1)
638        
639        self.worker_is_ready = True
640    
641    async def wait_for_worker_readyness(self, i: Interface):
642        while not self.worker_is_ready:
643            await i(Sleep, 0.1)
644    
645    def start(self, wr: Optional[TkObjWrapper] = None):
646        if self.i is None:
647            self.i = current_interface()
648        
649        if wr is None:
650            self.i(PutCoro, self._worker_readyness_waiting_coro)
651            self.i(PutCoro, self._update)
652            self.i(PutCoro, self._update_stats)
653        else:
654            wr.put_coro(self._worker_readyness_waiting_coro)
655            wr.put_coro(self._update)
656            wr.put_coro(self._update_stats)
657    
658    def stop(self):
659        self._stop = True
660        need_to_block = False
661        try:
662            i: Interface = current_interface()
663        except:
664            need_to_block = True
665        
666        with self.worker_lock:
667            if need_to_block:
668                self.worker.stop()
669            else:
670                run_in_thread_pool_fast(i, self.worker.stop)
671    
672    def _update(self, i: Interface):
673        i(RunCoro, self.wait_for_worker_readyness)
674        while not self._stop:
675            # start = perf_counter()
676            i(Yield)
677            # stop = perf_counter()
678            # delta_time = stop - start
679            loop: CoroSchedulerType = i._loop
680            delta_time = loop.loop_iteration_delta_time
681            self.fac(self.external_items_key, delta_time)
682            self.fac(self.internal_items_key, delta_time)
683
684    def _update_stats(self, i: Interface):
685        i(RunCoro, self.wait_for_worker_readyness)
686        while not self._stop:
687            try:
688                data = i(FastAggregator, self.internal_items_key)
689                if len(data) >= self.window_size:
690                    self.data_window = data[-self.window_size:]
691                else:
692                    additional_data_len = self.window_size - len(data)
693                    if len(self.data_window) >= additional_data_len:
694                        self.data_window = self.data_window[-additional_data_len:] + data
695                    else:
696                        self.data_window.extend(data)
697                
698                iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation = self.compute_stats(i, self.data_window)
699                if self.max_deviation is None:
700                    self.max_deviation = max_deviation
701                else:
702                    self.max_deviation = max(self.max_deviation, max_deviation)
703                
704                if self.min_deviation is None:
705                    self.min_deviation = min_deviation
706                else:
707                    self.min_deviation = min(self.min_deviation, min_deviation)
708                
709                if self.max_iter_per_sec is None:
710                    self.max_iter_per_sec = iter_per_sec
711                else:
712                    self.max_iter_per_sec = max(self.max_iter_per_sec, iter_per_sec)
713                
714                if self.min_iter_per_sec is None:
715                    self.min_iter_per_sec = iter_per_sec
716                else:
717                    self.min_iter_per_sec = min(self.min_iter_per_sec, iter_per_sec)
718                
719                self.fac(self.stats_key, (iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation))
720                self.fac(self.lifetime_stats_key, (self.max_iter_per_sec, self.min_iter_per_sec, self.max_deviation, self.min_deviation))
721                
722                self.window_size = 2 * int(round(iter_per_sec))
723            except KeyError:
724                pass
725            
726            i(Sleep, 0.2)
727    
728    def compute_stats(self, i: Interface, data):
729        with self.worker_lock:
730            self.worker.send_data_to_subprocess(data)
731            result = None
732            got_result = False
733            while not got_result:
734                try:
735                    result = self.worker.get_answer_from_subprocess(block=False)
736                except Empty:
737                    pass
738                else:
739                    got_result = True
740                
741                if not got_result:
742                    i(Sleep, 0.005)
743                
744            return result
745    
746    @staticmethod
747    def process_initializer(init_data):
748        pass
749    
750    @staticmethod
751    def process_worker(global_data, data):
752        from cengal.statistics.normal_distribution import count_99_95_68
753        val_99, val_95, val_68, max_deviation, min_deviation = count_99_95_68(data)
754        average_data = average(data)
755        iter_per_sec = 0 if 0 == average_data else (1 / average_data)
756        return (iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation)
757        
758    @staticmethod
759    def item_formatter(data) -> str:
760        return f'{data}'
761        
762    @staticmethod
763    def stats_formatter(data) -> str:
764        i: Interface = current_interface()
765        iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation = data
766        return f'{i.coro_id}. Stats:\niter_per_sec: {iter_per_sec}\nval_99: {val_99}\nval_95: {val_95}\nval_68: {val_68}\nmax_deviation: {max_deviation}\nmin_deviation: {min_deviation}'
767        
768    @staticmethod
769    def lifetime_stats_formatter(data) -> str:
770        i: Interface = current_interface()
771        max_iter_per_sec, min_iter_per_sec, max_deviation, min_deviation = data
772        return f'{i.coro_id}. Lifetime Stats:\nmax_iter_per_sec: {max_iter_per_sec}\nmin_iter_per_sec: {min_iter_per_sec}\nmax_deviation: {max_deviation}\nmin_deviation: {min_deviation}'
SchedulerPerformanceFormatter( loop_iteration_delta_times_key: typing.Hashable, loop_iteration_delta_times_lifetime_stats_key: typing.Hashable, loop_iteration_delta_times_stats_key: typing.Hashable, window_size: int)
603    def __init__(self, loop_iteration_delta_times_key: Hashable, loop_iteration_delta_times_lifetime_stats_key: Hashable, loop_iteration_delta_times_stats_key: Hashable, window_size: int):
604        self.external_items_key: Hashable = loop_iteration_delta_times_key
605        self.internal_items_key: Hashable = f'internal_scheduler_tdelta__{uuid4()}'
606        self.lifetime_stats_key: Hashable = loop_iteration_delta_times_lifetime_stats_key
607        self.stats_key: Hashable = loop_iteration_delta_times_stats_key
608        self.window_size: int = window_size
609        self.fac = FastAggregatorClient()
610        self.i: Interface = None
611        self.data_window: List = list()
612        self.max_deviation: float = None
613        self.min_deviation: float = None
614        self.max_iter_per_sec: float = None
615        self.min_iter_per_sec: float = None
616        self._stop: bool = False
617
618        settings = SubprocessWorkerSettings()
619        settings.initiation_function = self.process_initializer
620        settings.working_function = self.process_worker
621        settings.transport = Transport.queue
622        settings.sendable_data_type = SendableDataType.marshalable
623        self.worker_lock: Lock = Lock(f'SchedulerPerformanceFormatter.worker__{uuid4()}')
624        self.worker: SubprocessWorker = SubprocessWorker(settings)
625        self.worker.start(wait_for_process_readyness=False)
626        self.worker_is_ready: bool = False
external_items_key: Hashable
internal_items_key: Hashable
lifetime_stats_key: Hashable
stats_key: Hashable
window_size: int
fac
i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface
data_window: List
max_deviation: float
min_deviation: float
max_iter_per_sec: float
min_iter_per_sec: float
worker_lock: cengal.parallel_execution.coroutines.coro_tools.lock.versions.v_0.lock.Lock
worker: cengal.parallel_execution.multiprocess.multiprocessing_task_runner.SubprocessWorker
worker_is_ready: bool
async def wait_for_worker_readyness( self, i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface):
641    async def wait_for_worker_readyness(self, i: Interface):
642        while not self.worker_is_ready:
643            await i(Sleep, 0.1)
def start( self, wr: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.tkinter.versions.v_0.tkinter.TkObjWrapper, NoneType] = None):
645    def start(self, wr: Optional[TkObjWrapper] = None):
646        if self.i is None:
647            self.i = current_interface()
648        
649        if wr is None:
650            self.i(PutCoro, self._worker_readyness_waiting_coro)
651            self.i(PutCoro, self._update)
652            self.i(PutCoro, self._update_stats)
653        else:
654            wr.put_coro(self._worker_readyness_waiting_coro)
655            wr.put_coro(self._update)
656            wr.put_coro(self._update_stats)
def stop(self):
658    def stop(self):
659        self._stop = True
660        need_to_block = False
661        try:
662            i: Interface = current_interface()
663        except:
664            need_to_block = True
665        
666        with self.worker_lock:
667            if need_to_block:
668                self.worker.stop()
669            else:
670                run_in_thread_pool_fast(i, self.worker.stop)
def compute_stats( self, i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, data):
728    def compute_stats(self, i: Interface, data):
729        with self.worker_lock:
730            self.worker.send_data_to_subprocess(data)
731            result = None
732            got_result = False
733            while not got_result:
734                try:
735                    result = self.worker.get_answer_from_subprocess(block=False)
736                except Empty:
737                    pass
738                else:
739                    got_result = True
740                
741                if not got_result:
742                    i(Sleep, 0.005)
743                
744            return result
@staticmethod
def process_initializer(init_data):
746    @staticmethod
747    def process_initializer(init_data):
748        pass
@staticmethod
def process_worker(global_data, data):
750    @staticmethod
751    def process_worker(global_data, data):
752        from cengal.statistics.normal_distribution import count_99_95_68
753        val_99, val_95, val_68, max_deviation, min_deviation = count_99_95_68(data)
754        average_data = average(data)
755        iter_per_sec = 0 if 0 == average_data else (1 / average_data)
756        return (iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation)
@staticmethod
def item_formatter(data) -> str:
758    @staticmethod
759    def item_formatter(data) -> str:
760        return f'{data}'
@staticmethod
def stats_formatter(data) -> str:
762    @staticmethod
763    def stats_formatter(data) -> str:
764        i: Interface = current_interface()
765        iter_per_sec, val_99, val_95, val_68, max_deviation, min_deviation = data
766        return f'{i.coro_id}. Stats:\niter_per_sec: {iter_per_sec}\nval_99: {val_99}\nval_95: {val_95}\nval_68: {val_68}\nmax_deviation: {max_deviation}\nmin_deviation: {min_deviation}'
@staticmethod
def lifetime_stats_formatter(data) -> str:
768    @staticmethod
769    def lifetime_stats_formatter(data) -> str:
770        i: Interface = current_interface()
771        max_iter_per_sec, min_iter_per_sec, max_deviation, min_deviation = data
772        return f'{i.coro_id}. Lifetime Stats:\nmax_iter_per_sec: {max_iter_per_sec}\nmin_iter_per_sec: {min_iter_per_sec}\nmax_deviation: {max_deviation}\nmin_deviation: {min_deviation}'
def scheduler_stats_aggregator_provider( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, fac_key: str = 'coro_scheduler_stats'):
1113def scheduler_stats_aggregator_provider(i: Interface, fac_key: str = 'coro_scheduler_stats'):
1114    cs: CoroSchedulerType = i._loop
1115    fac = FastAggregatorClient()
1116    while True:
1117        stats_level: EntityStatsMixin.StatsLevel = EntityStatsMixin.StatsLevel.info
1118        result = dict()
1119        name, stats = cs.get_entity_stats(stats_level)
1120        result[name] = stats
1121        fac(fac_key, result)
1122        i(Sleep, 0.5)
class CSStatsFormatterMultiprocess:
272class CSStatsFormatterMultiprocess:
273    def __init__(self, filtered_paths: List[Sequence[str]] = None):
274        self.filtered_paths: List[Sequence[str]] = [
275            ['CoroSchedulerBase', 'loop', 'coroutines execution times'],
276            ['CoroSchedulerBase', 'loop', 'longest continuous execution time of coroutines'],
277            ['CoroSchedulerGreenlet', 'loop', 'coroutines execution times'],
278            ['CoroSchedulerGreenlet', 'loop', 'longest continuous execution time of coroutines'],
279            ['CoroSchedulerAwaitable', 'loop', 'coroutines execution times'],
280            ['CoroSchedulerAwaitable', 'loop', 'longest continuous execution time of coroutines'],
281        ] if filtered_paths is None else filtered_paths
282        self._stop: bool = False
283        self._subprocess_started: bool = False
284        settings = SubprocessWorkerSettings()
285        settings.initiation_function = self.process_initializer
286        settings.working_function = self.process_worker
287        settings.transport = Transport.queue
288        settings.sendable_data_type = SendableDataType.marshalable
289        settings.use_internal_subprocess_input_buffer = True
290        settings.initialization_data = {
291            'filtered_paths': self.filtered_paths
292        }
293        
294        self.worker_lock: Lock = Lock(f'CSStatsFormatterMultiprocess.worker__{uuid4()}')
295        self.worker: SubprocessWorker = SubprocessWorker(settings)
296        self.worker.start(wait_for_process_readyness=False)
297        self.worker_is_ready: bool = False
298    
299    async def _worker_readyness_waiting_coro(self, i: Interface):
300        need_to_wait = True
301        while need_to_wait:
302            try:
303                async with self.worker_lock:
304                    self.worker.wait_for_subprocess_readines(block=False)
305
306                need_to_wait = False
307            except SubprocessIsNotReadyError:
308                await i(Sleep, 0.1)
309        
310        self.worker_is_ready = True
311    
312    async def wait_for_worker_readyness(self, i: Interface):
313        while not self.worker_is_ready:
314            await i(Sleep, 0.1)
315    
316    def start(self, wr: Optional[TkObjWrapper] = None):
317        if wr is None:
318            i = current_interface()
319            i(PutCoro, self._worker_readyness_waiting_coro)
320        else:
321            wr.put_coro(self._worker_readyness_waiting_coro)
322    
323    def stop(self):
324        self._stop = True
325        need_to_block = False
326        try:
327            i: Interface = current_interface()
328        except:
329            need_to_block = True
330        
331        with self.worker_lock:
332            if need_to_block:
333                self.worker.stop()
334            else:
335                run_in_thread_pool_fast(i, self.worker.stop)
336        
337    def update_filtered_paths(self, filtered_paths: List[Sequence[str]]):
338        i: Interface = current_interface()
339        self.filtered_paths = filtered_paths
340        with self.worker_lock:
341            if not self._stop:
342                self.worker.send_data_to_subprocess({
343                    'type': 'filter',
344                    'data': self.filtered_paths
345                })
346    
347    def __call__(self, data):
348        with self.worker_lock:
349            if not self.worker_is_ready:
350                # return data
351                return '<<Waiting for subprocess to start...>>'
352            
353            i: Interface = current_interface()
354            self.worker.send_data_to_subprocess({
355                'type': 'data',
356                'data': data
357            })
358            result = None
359            got_result = False
360            while (not got_result) and (not self._stop):
361                try:
362                    result = self.worker.get_answer_from_subprocess(block=False)
363                except Empty:
364                    pass
365                else:
366                    got_result = True
367                
368                if got_result:
369                    if 'data' != result['type']:
370                        got_result = False
371                
372                if not got_result:
373                    i(Sleep, 0.01)
374            
375            result = '' if result is None else result['data']
376            result = f'{i.coro_id}. CoroScheduler stats:\n{result}'
377            return result
378    
379    @staticmethod
380    def process_initializer(init_data) -> Any:
381        from pprintpp import pformat as pf
382        return init_data
383    
384    @staticmethod
385    def process_worker(global_data, data_msg):
386        data = data_msg['data']
387        if 'filter' == data_msg['type']:
388            if global_data is None:
389                global_data = dict()
390            
391            global_data['filtered_paths'] = data
392            return None  # no answer to parent process
393        elif 'data' == data_msg['type']:
394            filtered_paths = global_data['filtered_paths']
395            for path in filtered_paths:
396                try_del_dict_item(data, path)
397            
398            data = pf(data, indent=4, width=120)
399            data_msg['data'] = data
400            return data_msg
CSStatsFormatterMultiprocess(filtered_paths: typing.List[typing.Sequence[str]] = None)
273    def __init__(self, filtered_paths: List[Sequence[str]] = None):
274        self.filtered_paths: List[Sequence[str]] = [
275            ['CoroSchedulerBase', 'loop', 'coroutines execution times'],
276            ['CoroSchedulerBase', 'loop', 'longest continuous execution time of coroutines'],
277            ['CoroSchedulerGreenlet', 'loop', 'coroutines execution times'],
278            ['CoroSchedulerGreenlet', 'loop', 'longest continuous execution time of coroutines'],
279            ['CoroSchedulerAwaitable', 'loop', 'coroutines execution times'],
280            ['CoroSchedulerAwaitable', 'loop', 'longest continuous execution time of coroutines'],
281        ] if filtered_paths is None else filtered_paths
282        self._stop: bool = False
283        self._subprocess_started: bool = False
284        settings = SubprocessWorkerSettings()
285        settings.initiation_function = self.process_initializer
286        settings.working_function = self.process_worker
287        settings.transport = Transport.queue
288        settings.sendable_data_type = SendableDataType.marshalable
289        settings.use_internal_subprocess_input_buffer = True
290        settings.initialization_data = {
291            'filtered_paths': self.filtered_paths
292        }
293        
294        self.worker_lock: Lock = Lock(f'CSStatsFormatterMultiprocess.worker__{uuid4()}')
295        self.worker: SubprocessWorker = SubprocessWorker(settings)
296        self.worker.start(wait_for_process_readyness=False)
297        self.worker_is_ready: bool = False
filtered_paths: List[Sequence[str]]
worker_lock: cengal.parallel_execution.coroutines.coro_tools.lock.versions.v_0.lock.Lock
worker: cengal.parallel_execution.multiprocess.multiprocessing_task_runner.SubprocessWorker
worker_is_ready: bool
async def wait_for_worker_readyness( self, i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface):
312    async def wait_for_worker_readyness(self, i: Interface):
313        while not self.worker_is_ready:
314            await i(Sleep, 0.1)
def start( self, wr: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.tkinter.versions.v_0.tkinter.TkObjWrapper, NoneType] = None):
316    def start(self, wr: Optional[TkObjWrapper] = None):
317        if wr is None:
318            i = current_interface()
319            i(PutCoro, self._worker_readyness_waiting_coro)
320        else:
321            wr.put_coro(self._worker_readyness_waiting_coro)
def stop(self):
323    def stop(self):
324        self._stop = True
325        need_to_block = False
326        try:
327            i: Interface = current_interface()
328        except:
329            need_to_block = True
330        
331        with self.worker_lock:
332            if need_to_block:
333                self.worker.stop()
334            else:
335                run_in_thread_pool_fast(i, self.worker.stop)
def update_filtered_paths(self, filtered_paths: typing.List[typing.Sequence[str]]):
337    def update_filtered_paths(self, filtered_paths: List[Sequence[str]]):
338        i: Interface = current_interface()
339        self.filtered_paths = filtered_paths
340        with self.worker_lock:
341            if not self._stop:
342                self.worker.send_data_to_subprocess({
343                    'type': 'filter',
344                    'data': self.filtered_paths
345                })
@staticmethod
def process_initializer(init_data) -> Any:
379    @staticmethod
380    def process_initializer(init_data) -> Any:
381        from pprintpp import pformat as pf
382        return init_data
@staticmethod
def process_worker(global_data, data_msg):
384    @staticmethod
385    def process_worker(global_data, data_msg):
386        data = data_msg['data']
387        if 'filter' == data_msg['type']:
388            if global_data is None:
389                global_data = dict()
390            
391            global_data['filtered_paths'] = data
392            return None  # no answer to parent process
393        elif 'data' == data_msg['type']:
394            filtered_paths = global_data['filtered_paths']
395            for path in filtered_paths:
396                try_del_dict_item(data, path)
397            
398            data = pf(data, indent=4, width=120)
399            data_msg['data'] = data
400            return data_msg
class CorosLogsProvider:
516class CorosLogsProvider:
517    def __init__(self, coros_logs_key: Hashable, logs_limit: Union[None, int], period: float, force_stop_timeout: RationalNumber = 0.1):
518        self.coros_logs_key: Hashable = coros_logs_key
519        self.logs_limit: Union[None, int] = logs_limit
520        self.period: float = period
521        self.i: Interface = None
522        self._update_coro_id: CoroID = None
523        self.force_stop_timeout: RationalNumber = force_stop_timeout
524        self.fac = FastAggregatorClient()
525        self._stop = False
526        self.lifetime_stats: Dict[CoroID, float] = dict()
527        self._current_logs_taken: bool = False
528        self._unsend_data: List[List[Tuple[Tuple, Dict]]] = list()
529        self._log_service_handler_was_added: bool = False
530    
531    def start(self):
532        if self.i is None:
533            self.i = current_interface()
534        
535        self._update_coro_id = self.i(PutCoro, self._update)
536    
537    def stop(self):
538        self._stop = True
539        current_interface()(PutCoro, self._force_stop)
540    
541    async def _force_stop(self, i: Interface):
542        if self._update_coro_id is not None:
543            update_coro_id = self._update_coro_id
544            self._update_coro_id = None
545            await agraceful_coro_destroyer(i, self.force_stop_timeout, update_coro_id)
546        
547        if self._log_service_handler_was_added:
548            self._log_service_handler_was_added = False
549            await i(LogRequest().remove_iteration_handler(self))
550        
551        self._unsend_data = type(self._unsend_data)()
552    
553    async def _update(self, i: Interface):
554        while not self._stop:
555            if not self._log_service_handler_was_added:
556                if i._loop.is_service_registered(Log):
557                    self._log_service_handler_was_added = True
558                    await i(LogRequest().add_iteration_handler(self))
559                    
560            logs_parts = self._unsend_data
561            self._unsend_data = type(self._unsend_data)()
562            for logs_part in logs_parts:
563                self.fac(self.coros_logs_key, logs_part)
564            
565            await i(Sleep, self.period)
566    
567    def __call__(self, log_service: Log, data: List[Tuple[Tuple, Dict, Dict[str, Any]]], current_time: float, current_time_str: str):
568        if self._current_logs_taken:
569            if self.logs_limit is None:
570                if data:
571                    self._unsend_data.append(data)
572            else:
573                num_of_an_initial_logs_needed: int = self.logs_limit - len(data)
574                if 0 <= num_of_an_initial_logs_needed:
575                    if data:
576                        self._unsend_data.append(data)
577                else:
578                    part_of_data = data[-self.logs_limit:]
579                    if part_of_data:
580                        self._unsend_data.append(part_of_data)
581        else:
582            self._current_logs_taken = True
583            if self.logs_limit is None:
584                combined_data = log_service.get_last_n_logs(None) + data
585                if combined_data:
586                    self._unsend_data.append(combined_data)
587            else:
588                num_of_an_initial_logs_needed: int = self.logs_limit - len(data)
589                if 0 < num_of_an_initial_logs_needed:
590                    combined_data = log_service.get_last_n_logs(num_of_an_initial_logs_needed) + data
591                    if combined_data:
592                        self._unsend_data.append(combined_data)
593                if 0 == num_of_an_initial_logs_needed:
594                    if data:
595                        self._unsend_data.append(data)
596                else:
597                    part_of_data = data[-self.logs_limit:]
598                    if part_of_data:
599                        self._unsend_data.append(part_of_data)
CorosLogsProvider( coros_logs_key: typing.Hashable, logs_limit: typing.Union[NoneType, int], period: float, force_stop_timeout: typing.Union[int, float] = 0.1)
517    def __init__(self, coros_logs_key: Hashable, logs_limit: Union[None, int], period: float, force_stop_timeout: RationalNumber = 0.1):
518        self.coros_logs_key: Hashable = coros_logs_key
519        self.logs_limit: Union[None, int] = logs_limit
520        self.period: float = period
521        self.i: Interface = None
522        self._update_coro_id: CoroID = None
523        self.force_stop_timeout: RationalNumber = force_stop_timeout
524        self.fac = FastAggregatorClient()
525        self._stop = False
526        self.lifetime_stats: Dict[CoroID, float] = dict()
527        self._current_logs_taken: bool = False
528        self._unsend_data: List[List[Tuple[Tuple, Dict]]] = list()
529        self._log_service_handler_was_added: bool = False
coros_logs_key: Hashable
logs_limit: Union[NoneType, int]
period: float
i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface
force_stop_timeout: Union[int, float]
fac
lifetime_stats: Dict[int, float]
def start(self):
531    def start(self):
532        if self.i is None:
533            self.i = current_interface()
534        
535        self._update_coro_id = self.i(PutCoro, self._update)
def stop(self):
537    def stop(self):
538        self._stop = True
539        current_interface()(PutCoro, self._force_stop)
class CoroLogsAppendFormatter(cengal.user_interface.gui.tkinter.components.aggregator_view.versions.v_0.aggregator_view.AggregatorAppendFormatter):
463class CoroLogsAppendFormatter(AggregatorAppendFormatter):
464    def __init__(self, initial_text: str, desired_parts: CoroLogsAppendFormatterParts = CoroLogsAppendFormatterParts.all) -> None:
465        super().__init__(initial_text)
466        self._prompt_string: str = f'>>> {"-"*80}'
467        self.desired_parts: CoroLogsAppendFormatterParts = desired_parts
468
469    def __call__(self, data: List[Tuple[Tuple, Dict, Dict[str, Any]]]) -> Any:
470        if data:
471            data_part = '\n\n'.join([self._format(*log_info) for log_info in data])
472            return super().__call__(f'\n{data_part}\n')
473        else:
474            return super().__call__(str())
475
476    def _format(self, args, kwargs, info=None) -> str:
477        if info is None:
478            return f'{self._prompt_string}\n{args_kwargs_to_str(args, kwargs)}'
479        else:
480            output_strings: List[str] = list()
481            if CoroLogsAppendFormatterParts.prompt_string in self.desired_parts:
482                output_strings.append(self._prompt_string)
483
484            if CoroLogsAppendFormatterParts.time in self.desired_parts:
485                output_strings.append(f'> Time: {info[InfoFields.current_time]}; Perf Counter: {info[InfoFields.perf_counter_time]:17.6f}')
486
487            if CoroLogsAppendFormatterParts.caller_info in self.desired_parts:
488                output_strings.append(f'> λ: {info[InfoFields.caller_info]}')
489
490            if CoroLogsAppendFormatterParts.logging_level in self.desired_parts:
491                if InfoFields.logging_level in info:
492                    output_strings.append(f'> Logging level: {info[InfoFields.logging_level]}')
493            
494            if CoroLogsAppendFormatterParts.log in self.desired_parts:
495                output_strings.append(args_kwargs_to_str(args, kwargs))
496
497            if CoroLogsAppendFormatterParts.coros_traceback in self.desired_parts:
498                coro_parents_strings: List[str] = info[InfoFields.coro_parents_strings]
499                if coro_parents_strings:
500                    coro_parents_text: str = '\n'.join(coro_parents_strings)
501                    output_strings.append(f'> Coros traceback:\n{coro_parents_text}')
502            
503            if CoroLogsAppendFormatterParts.file_name_and_line_number in self.desired_parts:
504                output_strings.append(f'> @ {info[InfoFields.file_name]}:{info[InfoFields.line_number]}')
505
506            if CoroLogsAppendFormatterParts.traceback in self.desired_parts:
507                traceback_strings: List[str] = info[InfoFields.traceback_strings]
508                if traceback_strings:
509                    traceback_text: str = '\n'.join(traceback_strings)
510                    traceback_text = traceback_text.strip('\n')
511                    output_strings.append(f'> Traceback:\n{traceback_text}')
512            
513            return '\n'.join(output_strings)
CoroLogsAppendFormatter( initial_text: str, desired_parts: CoroLogsAppendFormatterParts = <CoroLogsAppendFormatterParts.all: 255>)
464    def __init__(self, initial_text: str, desired_parts: CoroLogsAppendFormatterParts = CoroLogsAppendFormatterParts.all) -> None:
465        super().__init__(initial_text)
466        self._prompt_string: str = f'>>> {"-"*80}'
467        self.desired_parts: CoroLogsAppendFormatterParts = desired_parts
Inherited Members
cengal.user_interface.gui.tkinter.components.aggregator_view.versions.v_0.aggregator_view.AggregatorAppendFormatter
initial_text
initiated
reset
class CoroLogsAppendFormatterParts(enum.IntFlag):
450class CoroLogsAppendFormatterParts(IntFlag):
451    none = 0
452    prompt_string = auto()
453    time = auto()
454    caller_info = auto()
455    logging_level = auto()
456    log = auto()
457    coros_traceback = auto()
458    file_name_and_line_number = auto()
459    traceback = auto()
460    all = prompt_string | time | caller_info | logging_level | log | coros_traceback | file_name_and_line_number | traceback

An enumeration.

file_name_and_line_number = <CoroLogsAppendFormatterParts.file_name_and_line_number: 64>
Inherited Members
builtins.int
conjugate
bit_length
to_bytes
from_bytes
as_integer_ratio
real
imag
numerator
denominator
enum.Enum
name
value