cengal.parallel_execution.coroutines.coro_standard_services.log.versions.v_0.log
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19__all__ = ['InfoFields', 'default_info_gatherer', 'LogExtended', 'LogEx', 'Log', 'LogRequest', 20 'view_log', 'clear_log', 'LogClient', 'default_log_client', 'log_fast', 'alog_fast', 21 'log', 'alog', 'put_log_fast', 'plog_fast', 'put_log', 'plog'] 22 23from cengal.parallel_execution.coroutines.coro_scheduler import * 24from cengal.parallel_execution.coroutines.coro_tools.await_coro import * 25from cengal.parallel_execution.coroutines.coro_standard_services.asyncio_loop import * 26from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import CoroPriority 27from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import * 28from cengal.parallel_execution.coroutines.coro_standard_services.timer_func_runner import * 29from cengal.parallel_execution.coroutines.coro_standard_services.instance import * 30from cengal.file_system.app_fs_structure.app_dir_path import AppDirPath, AppDirectoryType 31from cengal.file_system.path_manager import RelativePath 32from cengal.file_system.file_manager import path_relative_to_current_dir 33from cengal.time_management.cpu_clock_cycles import perf_counter 34from cengal.data_manipulation.serialization import * 35from cengal.introspection.inspect import get_exception, frame, entity_repr_owner_based, entity_name 36from cengal.code_flow_control.python_bytecode_manipulator import get_code 37from cengal.code_flow_control.args_manager import args_kwargs_to_str 38from cengal.code_flow_control.smart_values import ValueExistence 39from enum import IntEnum 40from traceback import format_stack 41from typing import Tuple, List, Any, Dict, Callable 42from datetime import datetime 43import logging 44import sys 45import os 46import asyncio 47try: 48 import lmdb 49except ImportError: 50 from warnings import warn 51 warn('''WARNING: `lmdb` library is not installed. Log service will not work. 52 To install `lmdb` use: `pip install lmdb`''') 53 raise 54 55 56""" 57Module Docstring 58Docstrings: http://www.python.org/dev/peps/pep-0257/ 59""" 60 61__author__ = "ButenkoMS <gtalk@butenkoms.space>" 62__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 63__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 64__license__ = "Apache License, Version 2.0" 65__version__ = "4.4.1" 66__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 67__email__ = "gtalk@butenkoms.space" 68# __status__ = "Prototype" 69__status__ = "Development" 70# __status__ = "Production" 71 72 73def get_coro_parents_path(coro_id: CoroID) -> List[CoroID]: 74 parents: Set[CoroID] = set() 75 result: List[CoroID] = list() 76 def handler(deep, child, parent, index): 77 if parent is not None: 78 coro_id = parent[1] 79 parents.add(coro_id) 80 result.append(coro_id) 81 82 try_travers_through_all_coro_parents(coro_id, handler) 83 return result 84 85 86def coro_info_string(cs: CoroSchedulerType, coro_id: CoroID) -> str: 87 coro: Optional[CoroWrapperBase] = cs.get_coro(coro_id) 88 if coro is None: 89 return f' CoroID: {coro_id:10}' 90 else: 91 coro_worker = coro.worker 92 if isinstance(coro_worker, GreenletWorkerWrapper): 93 coro_worker = coro_worker.worker 94 95 return f' CoroID: {coro_id:10}; Type: {"Awaitable" if isinstance(coro, CoroWrapperAsyncAwait) else "Greenlet"}; Worker: {entity_repr_owner_based(coro_worker)}' 96 97 98def get_coro_parents_strings(coro_id: CoroID) -> List[str]: 99 coro_parents_path: List[CoroID] = get_coro_parents_path(coro_id) 100 if coro_parents_path: 101 cs: CoroSchedulerType = get_current_coro_scheduler() 102 if cs is None: 103 return list([f' CoroID: {coro_id:10}' for coro_id in coro_parents_path]) 104 else: 105 return list([coro_info_string(cs, coro_id) for coro_id in coro_parents_path]) 106 else: 107 return list() 108 109 110class InfoFields(IntEnum): 111 current_time = 0 112 file_name = 1 113 line_number = 2 114 caller_info = 3 115 traceback_strings = 4 116 perf_counter_time = 5 117 coro_parents_strings = 6 118 logging_level = 7 119 120 121def default_info_gatherer(depth: int) -> Dict[str, Any]: 122 interested_frame = frame(depth + 1) 123 try: 124 interface: Interface = current_interface() 125 except OutsideCoroSchedulerContext: 126 interface = None 127 128 if interface is None: 129 caller_info = entity_repr_owner_based(interested_frame) 130 coro_parents_strings = list() 131 else: 132 coro_worker = interface._coro.worker 133 if isinstance(coro_worker, GreenletWorkerWrapper): 134 coro_worker = coro_worker.worker 135 136 caller_info = f'CoroID: {interface.coro_id:10}; Type: {"Awaitable" if isinstance(interface._coro, CoroWrapperAsyncAwait) else "Greenlet"}; Worker: {entity_repr_owner_based(coro_worker)}' 137 coro_parents_strings = get_coro_parents_strings(interface.coro_id) 138 139 return { 140 InfoFields.current_time: datetime.now(), 141 InfoFields.perf_counter_time: perf_counter(), 142 InfoFields.file_name: interested_frame.f_code.co_filename, 143 InfoFields.line_number: interested_frame.f_lineno, 144 InfoFields.caller_info: caller_info, 145 InfoFields.traceback_strings: format_stack(interested_frame), 146 InfoFields.coro_parents_strings: coro_parents_strings, 147 } 148 149 150class LogExtended(TypedServiceRequest[None]): 151 default__request__type__ = 4 152 153 def __init__(self, info_gatherer: Optional[Callable[[int], Dict[str, Any]]] = None, depth: Optional[int] = 1): 154 super().__init__() 155 self.info_gatherer: Optional[Callable] = info_gatherer 156 self.depth: Optional[int] = depth 157 158 def _copy(self) -> 'LogExtended': 159 return LogExtended(self.info_gatherer, self.depth) 160 161 def __call__(self, *args, **kwargs) -> 'LogEx': 162 if self.info_gatherer is None: 163 info = None 164 else: 165 info = self.info_gatherer(self.depth + 1) 166 167 return self._save_to_copy(self.default__request__type__, args, kwargs, info) 168 169 170LogEx = LogExtended 171 172 173class LogRequest(ServiceRequest): 174 def set_db_environment_path(self, path_to_db_environment: str) -> ServiceRequest: 175 return self._save(0, path_to_db_environment) 176 177 def sync(self) -> ServiceRequest: 178 return self._save(1) 179 180 def add_iteration_handler(self, handler: Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]) -> ServiceRequest: 181 return self._save(2, handler) 182 183 def remove_iteration_handler(self, handler: Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]) -> ServiceRequest: 184 return self._save(3, handler) 185 186 def log(self, *args, **kwargs) -> ServiceRequest: 187 return LogEx[None](default_info_gatherer, depth=2)(*args, **kwargs) 188 189 def connect_to_logger(self, logger_instance: logging.Logger) -> ServiceRequest: 190 return self._save(5, logger_instance) 191 192 def disconnect_from_logger(self, logger_instance: logging.Logger) -> ServiceRequest: 193 return self._save(6, logger_instance) 194 195 196class Log(TypedService[None], EntityStatsMixin): 197 def __init__(self, loop: CoroSchedulerType): 198 super(Log, self).__init__(loop) 199 self.default_logs_dir: str = 'log.db' 200 self.path_to_db_environment = None 201 self.app_name_waiter: CoroWrapperBase = None 202 self.root_path_to_log_environment_rel: RelativePath = None 203 self.db_environment: lmdb.Environment = None 204 self.db = None 205 self.log_queue: List[Tuple[Tuple, Dict, Dict[str, Any]]] = list() 206 self.async_loop = None 207 self.log_counter = Counter() 208 self.sync_time_interval = 0.5 209 self.characters_in_counter = 16 210 self.current_counter_state_key = f'{str(0).zfill(self.characters_in_counter)}'.encode() 211 self.last_sync_time = perf_counter() 212 self.force_sync = False 213 self.write_locked = False 214 self.write_locked_coro_id: Optional[CoroID] = None 215 self.periodic_sync_started: bool = False 216 self.iteration_handlers: List[Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]] = list() 217 self.new_iteration_handlers_num: int = 0 218 self.logger_handlers: Dict[logging.Logger, LoggingHandler] = dict() 219 self.logger: logging.Logger = self._loop.logger 220 # self.serializer = best_serializer_for_standard_data((DataFormats.binary, 221 # Tags.can_use_bytes, 222 # Tags.decode_str_as_str, 223 # Tags.decode_list_as_list, 224 # Tags.decode_bytes_as_bytes, 225 # Tags.superficial, 226 # Tags.current_platform, 227 # Tags.multi_platform), 228 # TestDataType.small, 229 # 0.1) 230 self.serializer = Serializer(Serializers.msgspec_messagepack) 231 232 self._request_workers = { 233 0: self._on_set_db_environment_path, 234 1: self._on_sync, 235 2: self._on_add_iteration_handler, 236 3: self._on_remove_iteration_handler, 237 4: self._on_log_extended, 238 5: self._on_connect_to_logger, 239 6: self._on_disconnect_from_logger, 240 } 241 self.inject_handler_to_logger(self._loop.logger) 242 243 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 244 return type(self).__name__, { 245 'log counter': self.log_counter._index, 246 'current counter state key': self.current_counter_state_key, 247 } 248 249 def put_log(self, args, kwargs): 250 self.log_queue.append((args, kwargs, None)) 251 # self.make_live 252 253 def put_log_ex(self, args, kwargs, info): 254 self.log_queue.append((args, kwargs, info)) 255 256 def destroy(self): 257 # TODO: we need to use some loop destroy service in order to put our coro which will write all pending queues, 258 # sync envirounments and close them. Also we need to prevent new requests from being processed. (see DB service) 259 loggers_instancess = list(self.logger_handlers.keys()) 260 for logger_instance in loggers_instancess: 261 self.eject_handler_from_logger(logger_instance) 262 263 if self.db_environment is not None: 264 self.db_environment.close() 265 266 def single_task_registration_or_immediate_processing( 267 self, *args, **kwargs) -> Tuple[bool, None, None]: 268 result = self.try_resolve_request(*args, **kwargs) 269 if result is None: 270 coro_info = self.current_caller_coro_info 271 coro_worker = coro_info.coro.worker 272 if isinstance(coro_worker, GreenletWorkerWrapper): 273 coro_worker = coro_worker.worker 274 275 caller_info = f'CoroID: {coro_info.coro_id:10}; Type: {"Awaitable" if issubclass(coro_info.coro_type, CoroWrapperAsyncAwait) else "Greenlet"}; Worker: {entity_repr_owner_based(coro_worker)}' 276 coro_worker_code = get_code(coro_worker) 277 info = { 278 InfoFields.current_time: datetime.now(), 279 InfoFields.perf_counter_time: perf_counter(), 280 InfoFields.file_name: coro_worker_code.co_filename, 281 InfoFields.line_number: coro_worker_code.co_firstlineno, 282 InfoFields.caller_info: caller_info, 283 InfoFields.traceback_strings: list(), 284 # InfoFields.coro_parents_strings: get_coro_parents_strings(coro_info.coro_id), 285 InfoFields.coro_parents_strings: list(), 286 } 287 self.log_queue.append((args, kwargs, info)) 288 # self.make_live() 289 290 # TODO: we need to implement backpressure mechanism here. If we have too many pending requests, we need to put request to queue instead of responding immediately. 291 # However this will not be enough for a direct requests. We need to implement some kind of backpressure mechanism for direct requests too. 292 return True, None, None 293 else: 294 return result 295 296 def _ensure_default_db_environment(self) -> bool: 297 if self.db_environment is None: 298 if self.path_to_db_environment is None: 299 if self.app_name_waiter is None: 300 async def coro(i: Interface, self: 'Log'): 301 app_name_for_fs = await i(InstanceRequest().wait('app_name_for_fs')) 302 app_data_dir_path_type: AppDirectoryType = await i(InstanceRequest().wait('app_data_dir_path_type')) 303 app_dir_path: AppDirPath = await i(InstanceRequest().wait(AppDirPath)) 304 app_data_dir_path: str = app_dir_path.cached(app_data_dir_path_type, app_name_for_fs) 305 self.path_to_db_environment = RelativePath(app_data_dir_path)(self.default_logs_dir) 306 self._init_db() 307 self.app_name_waiter = None 308 self.make_live() 309 310 self.app_name_waiter = put_root_from_other_service(self, coro, self) 311 312 self.make_dead() 313 return False 314 else: 315 self._init_db() 316 return True 317 else: 318 return True 319 320 def full_processing_iteration(self): 321 if not self._ensure_default_db_environment(): 322 return 323 324 self.force_sync = False 325 log_queue_buff = self.log_queue 326 self.log_queue = type(log_queue_buff)() 327 current_time = perf_counter() 328 current_time_str = str(current_time) 329 for iteration_handler in self.iteration_handlers: 330 iteration_handler(self, log_queue_buff, current_time, current_time_str) 331 332 self.new_iteration_handlers_num = 0 333 334 def handler(): 335 with self.db_environment.begin(write=True) as txn: 336 for log_info in log_queue_buff: 337 key = f'{str(self.log_counter.get()).zfill(self.characters_in_counter)}__{current_time_str}'.encode() 338 value = self.serializer.dumps(log_info) 339 txn.put(key, value, db=self.db, dupdata=True, append=True) 340 341 txn.put(self.current_counter_state_key, self.serializer.dumps(self.log_counter._index), db=self.db) 342 343 lmdb_reapplier(self.db_environment, self.db, handler) 344 345 self.sync_in_thread_pool() 346 347 self.last_sync_time = perf_counter() 348 349 self.make_dead() 350 351 def in_work(self) -> bool: 352 result: bool = bool(self.log_queue) \ 353 or (self.db_environment is None) \ 354 or (not self.periodic_sync_started) \ 355 or self.new_iteration_handlers_num \ 356 or (self.force_sync or ((not self.write_locked) and bool(self.log_queue) and ((perf_counter() - self.last_sync_time) >= self.sync_time_interval))) 357 return self.thrifty_in_work(result) 358 359 def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]: 360 time_since_last_sync_time: float = perf_counter() - self.last_sync_time 361 if self.sync_time_interval > time_since_last_sync_time: 362 return True, self.sync_time_interval - time_since_last_sync_time 363 else: 364 return True, 0 365 366 def _init_db(self): 367 self.logger.info(f'Path to Log DB Env: {self.path_to_db_environment}') 368 self.db_environment = lmdb.open(self.path_to_db_environment, map_size=20 * 1024**2, writemap=True, max_dbs=2, 369 map_async=True, lock=False, metasync=False, sync=False, meminit=False) 370 self.db = self.db_environment.open_db(b'logs') 371 def handler(): 372 with self.db_environment.begin(write=True) as txn: 373 current_counter_state = txn.get(self.current_counter_state_key, db=self.db) 374 if current_counter_state is None: 375 txn.put(self.current_counter_state_key, self.serializer.dumps(self.log_counter._index), db=self.db) 376 else: 377 self.log_counter._index = self.serializer.loads(current_counter_state) 378 379 lmdb_reapplier(self.db_environment, self.db, handler) 380 self.db_environment.sync(True) 381 382 def _on_set_db_environment_path(self, path_to_db_environment: str): 383 if self.write_locked: 384 return True, False, None 385 386 if self.db_environment is None: 387 self.path_to_db_environment = path_to_db_environment 388 try: 389 self._init_db() 390 except: 391 exception = get_exception() 392 return True, False, exception 393 return True, True, None 394 else: 395 return True, False, None 396 397 def _on_sync(self): 398 if self.db_environment is None: 399 self.make_live() 400 else: 401 if self.log_queue: 402 self.force_sync = True 403 self.make_live() 404 else: 405 # self.db_environment.sync(True) 406 self.sync_in_thread_pool() 407 408 return True, None, None 409 410 def _on_log_extended(self, args, kwargs, info): 411 self.log_queue.append((args, kwargs, info)) 412 return True, None, None 413 414 def _on_add_iteration_handler(self, handler: Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]): 415 self.iteration_handlers.append(handler) 416 self.new_iteration_handlers_num += 1 417 return True, None, None 418 419 def _on_remove_iteration_handler(self, handler: Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]): 420 removed = False 421 try: 422 self.iteration_handlers.remove(handler) 423 removed = True 424 except ValueError: 425 pass 426 427 return True, removed, None 428 429 def sync_in_thread_pool(self): 430 async def sync_db_coro(i: Interface, self: 'Log', asyncio_loop, need_to_ensure_asyncio_loop: bool): 431 if need_to_ensure_asyncio_loop: 432 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None, CoroPriority.low, True)) 433 else: 434 if asyncio_loop is None: 435 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().get()) 436 437 async def sync_db(self, asyncio_loop): 438 def sync_worker(): 439 self.db_environment.sync(True) 440 441 await task_in_thread_pool(asyncio_loop, sync_worker) 442 443 await i(AsyncioLoop, AsyncioLoopRequest().wait(sync_db(self, asyncio_loop))) 444 self.write_locked_coro_id = None 445 self.write_locked = False 446 def make_service_live_for_a_next_sync(self: 'Log'): 447 self.periodic_sync_started = False 448 self.make_live() 449 450 await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self) 451 452 asyncio_loop = None 453 need_to_ensure_asyncio_loop = False 454 try: 455 asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get() 456 except AsyncioLoopWasNotSetError: 457 need_to_ensure_asyncio_loop = True 458 459 coro: CoroWrapperBase = self._loop.put_coro(sync_db_coro, self, asyncio_loop, need_to_ensure_asyncio_loop) 460 self.write_locked = True 461 self.write_locked_coro_id = coro.coro_id 462 self.periodic_sync_started = True 463 464 def get_last_n_logs(self, number: Union[None, int]=None) -> List[Tuple[Tuple, Dict, Dict[str, Any]]]: 465 if self.db_environment is None: 466 return list() 467 468 if number is None: 469 number = self.log_counter._index + 1 470 elif number <= 0: 471 return list() 472 473 result = list() 474 with self.db_environment.begin(db=self.db) as txn: 475 txn.cursor().last() 476 for key, value in txn.cursor().iterprev(): 477 if key == self.current_counter_state_key: 478 continue 479 480 if number <= 0: 481 break 482 483 result.append(self.serializer.loads(value)) 484 number -= 1 485 486 result.reverse() 487 return result 488 489 def inject_handler_to_logger(self, logger_instance: logging.Logger) -> bool: 490 if logger_instance not in self.logger_handlers: 491 logger_handler: LoggingHandler = LoggingHandler(self) 492 self.logger_handlers[logger_instance] = logger_handler 493 logger_instance.addHandler(logger_handler) 494 return True 495 else: 496 return False 497 498 def _on_connect_to_logger(self, logger_instance: logging.Logger): 499 result = self.inject_handler_to_logger(logger_instance) 500 return True, result, None 501 502 def eject_handler_from_logger(self, logger_instance: logging.Logger) -> bool: 503 if logger_instance in self.logger_handlers: 504 logger_instance.removeHandler(self.logger_handlers[logger_instance]) 505 del self.logger_handlers[logger_instance] 506 return True 507 else: 508 return False 509 510 def _on_disconnect_from_logger(self, logger_instance: logging.Logger): 511 result = self.eject_handler_from_logger(logger_instance) 512 return True, result, None 513 514 515LogExtended.default_service_type = Log 516LogRequest.default_service_type = Log 517 518 519class LogClient: 520 def __init__(self, info_gatherer: Optional[Callable[[int], Dict[str, Any]]] = None) -> None: 521 self.info_gatherer: Optional[Callable] = info_gatherer 522 self.log_extended_request: LogEx = LogEx(default_info_gatherer, depth=2) 523 self.extended: bool = True 524 525 def log_fast(self, i: Interface, *args, **kwargs) -> Optional[Any]: 526 if self.extended: 527 i(self.log_extended_request(*args, **kwargs)) 528 else: 529 i(Log, *args, **kwargs) 530 531 return args[0] if args else None 532 533 async def alog_fast(self, i: Interface, *args, **kwargs) -> Optional[Any]: 534 if self.extended: 535 await i(self.log_extended_request(*args, **kwargs)) 536 else: 537 await i(Log, *args, **kwargs) 538 539 return args[0] if args else None 540 541 def log(self, *args, **kwargs) -> Optional[Any]: 542 if self.extended: 543 current_interface()(self.log_extended_request(*args, **kwargs)) 544 else: 545 current_interface()(Log, *args, **kwargs) 546 547 return args[0] if args else None 548 549 async def alog(self, *args, **kwargs) -> Optional[Any]: 550 if self.extended: 551 await current_interface()(self.log_extended_request(*args, **kwargs)) 552 else: 553 await current_interface()(Log, *args, **kwargs) 554 555 return args[0] if args else None 556 557 def put_log_fast(self, scheduler: CoroSchedulerType, *args, **kwargs) -> Optional[Any]: 558 if self.extended: 559 log_ex_request: LogExtended = self.log_extended_request(*args, **kwargs) 560 scheduler.get_service_instance_fast(Log).put_log_ex(*log_ex_request.args, **log_ex_request.kwargs) 561 else: 562 scheduler.get_service_instance_fast(Log).put_log(args, kwargs) 563 564 return args[0] if args else None 565 566 plog_fast = put_log_fast 567 568 # async def aput_log_fast(self, scheduler: CoroSchedulerType, *args, **kwargs): 569 # if self.extended: 570 # log_ex_request: LogExtended = self.log_extended_request(*args, **kwargs) 571 # scheduler.get_service_instance_fast(Log).put_log_ex(*log_ex_request.args, **log_ex_request.kwargs) 572 # else: 573 # scheduler.get_service_instance_fast(Log).put_log(args, kwargs) 574 575 # aplog_fast = aput_log_fast 576 577 def put_log(self, *args, **kwargs) -> Optional[Any]: 578 return self.put_log_fast(current_coro_scheduler(), *args, **kwargs) 579 580 plog = put_log 581 582 # async def aput_log(self, *args, **kwargs): 583 # self.put_log_fast(current_coro_scheduler(), *args, **kwargs) 584 585 # aplog = aput_log 586 587 588default_log_client: LogClient = LogClient(default_info_gatherer) 589log_fast = default_log_client.log_fast 590alog_fast = default_log_client.alog_fast 591log = default_log_client.log 592alog = default_log_client.alog 593put_log_fast = default_log_client.put_log_fast 594plog_fast = default_log_client.plog_fast 595put_log = default_log_client.put_log 596plog = default_log_client.plog 597# aput_log_fast = default_log_client.aput_log_fast 598# aplog_fast = default_log_client.aplog_fast 599# aput_log = default_log_client.aput_log 600# aplog = default_log_client.aplog 601 602 603def view_log(path_to_db_environment: Optional[str]=None, file_to_redirect_output: Optional[str]=None): 604 if path_to_db_environment is None: 605 path_to_db_environment = path_relative_to_current_dir('log.db') 606 607 output_file = None 608 if file_to_redirect_output is not None: 609 output_file = open(file_to_redirect_output, 'wb') 610 611 try: 612 db_environment = lmdb.open(path_to_db_environment, map_size=20 * 1024 ** 2, writemap=True, max_dbs=2) 613 db = db_environment.open_db(b'logs') 614 # serializer = best_serializer_for_standard_data((DataFormats.binary, 615 # Tags.can_use_bytes, 616 # Tags.decode_str_as_str, 617 # Tags.decode_list_as_list, 618 # Tags.decode_bytes_as_bytes, 619 # Tags.superficial, 620 # Tags.current_platform, 621 # Tags.multi_platform), 622 # TestDataType.small, 623 # 0.1) 624 serializer = Serializer(Serializers.msgspec_messagepack) 625 with db_environment.begin() as txn: 626 for key, value in txn.cursor(db=db): 627 if key == f'{str(0).zfill(16)}'.encode(): 628 if output_file is None: 629 print(f'λλλ <<< {serializer.loads(value)} >>>') 630 print() 631 else: 632 output_file.write(f'λλλ <<< {serializer.loads(value)} >>>'.encode()) 633 output_file.write('\n'.encode()) 634 else: 635 args, kwargs, info = serializer.loads(value) 636 if output_file is None: 637 print(f'λ >>>\t{key}: {"~"*8}') 638 print(*args, **kwargs) 639 print(info) 640 print() 641 else: 642 output_file.write(f'λ >>>\t{key}: {"~"*8}\n'.encode()) 643 output_file.write(f'{args_kwargs_to_str(args, kwargs)}\n'.encode()) 644 output_file.write('\n'.encode()) 645 finally: 646 if output_file is not None: 647 output_file.close() 648 649 650def clear_log(path_to_db_environment: Optional[str]=None): 651 if path_to_db_environment is None: 652 path_to_db_environment = path_relative_to_current_dir('log.db') 653 db_environment = lmdb.open(path_to_db_environment, map_size=20 * 1024 ** 2, writemap=True, max_dbs=2) 654 db = db_environment.open_db(b'logs') 655 def handler(): 656 with db_environment.begin(write=True) as txn: 657 txn.drop(db=db, delete=False) 658 lmdb_reapplier(db_environment, db, handler) 659 660 661def lmdb_reapplier(environment: lmdb.Environment, db, handler: Callable): 662 failed = True 663 while failed: 664 need_to_drop = False 665 try: 666 handler() 667 failed = False 668 except lmdb.MapFullError: 669 need_to_drop = True 670 671 if need_to_drop: 672 environment.set_mapsize(environment.info()['map_size'] + 2 * 1024**2) 673 with environment.begin(write=True) as txn: 674 txn.drop(db=db, delete=False) 675 676 677 678class LoggingHandler(logging.Handler): 679 def __init__(self, log_service: Log, *args, **kwargs): 680 super().__init__(*args, **kwargs) 681 self.log_service: Log = log_service 682 self.cs: CoroSchedulerType = log_service._loop 683 684 def emit(self, record: logging.LogRecord): 685 log_entry = self.format(record) 686 interested_frame = frame(7) # TODO: mostly correct at least for a Python 3.8.10 (wrong at least for `warn()` and `exception()` logger methods) 687 try: 688 interface: Interface = current_interface() 689 except OutsideCoroSchedulerContext: 690 interface = None 691 692 if interface is None: 693 caller_info = f'FuncName: {record.funcName}' 694 coro_parents_strings = list() 695 else: 696 coro_worker = interface._coro.worker 697 if isinstance(coro_worker, GreenletWorkerWrapper): 698 coro_worker = coro_worker.worker 699 700 caller_info = f'FuncName: {record.funcName}; CoroID: {interface.coro_id:10}; Type: {"Awaitable" if isinstance(interface._coro, CoroWrapperAsyncAwait) else "Greenlet"}; Worker: {entity_repr_owner_based(coro_worker)}' 701 coro_parents_strings = get_coro_parents_strings(interface.coro_id) 702 self.log_service.put_log_ex((log_entry, ), dict(), { 703 InfoFields.current_time: datetime.fromtimestamp(record.created), 704 InfoFields.perf_counter_time: perf_counter(), 705 InfoFields.file_name: record.filename, 706 InfoFields.line_number: record.lineno, 707 InfoFields.caller_info: caller_info, 708 InfoFields.traceback_strings: format_stack(interested_frame), 709 InfoFields.coro_parents_strings: coro_parents_strings, 710 InfoFields.logging_level: record.levelname, 711 })
111class InfoFields(IntEnum): 112 current_time = 0 113 file_name = 1 114 line_number = 2 115 caller_info = 3 116 traceback_strings = 4 117 perf_counter_time = 5 118 coro_parents_strings = 6 119 logging_level = 7
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- to_bytes
- from_bytes
- as_integer_ratio
- real
- imag
- numerator
- denominator
122def default_info_gatherer(depth: int) -> Dict[str, Any]: 123 interested_frame = frame(depth + 1) 124 try: 125 interface: Interface = current_interface() 126 except OutsideCoroSchedulerContext: 127 interface = None 128 129 if interface is None: 130 caller_info = entity_repr_owner_based(interested_frame) 131 coro_parents_strings = list() 132 else: 133 coro_worker = interface._coro.worker 134 if isinstance(coro_worker, GreenletWorkerWrapper): 135 coro_worker = coro_worker.worker 136 137 caller_info = f'CoroID: {interface.coro_id:10}; Type: {"Awaitable" if isinstance(interface._coro, CoroWrapperAsyncAwait) else "Greenlet"}; Worker: {entity_repr_owner_based(coro_worker)}' 138 coro_parents_strings = get_coro_parents_strings(interface.coro_id) 139 140 return { 141 InfoFields.current_time: datetime.now(), 142 InfoFields.perf_counter_time: perf_counter(), 143 InfoFields.file_name: interested_frame.f_code.co_filename, 144 InfoFields.line_number: interested_frame.f_lineno, 145 InfoFields.caller_info: caller_info, 146 InfoFields.traceback_strings: format_stack(interested_frame), 147 InfoFields.coro_parents_strings: coro_parents_strings, 148 }
151class LogExtended(TypedServiceRequest[None]): 152 default__request__type__ = 4 153 154 def __init__(self, info_gatherer: Optional[Callable[[int], Dict[str, Any]]] = None, depth: Optional[int] = 1): 155 super().__init__() 156 self.info_gatherer: Optional[Callable] = info_gatherer 157 self.depth: Optional[int] = depth 158 159 def _copy(self) -> 'LogExtended': 160 return LogExtended(self.info_gatherer, self.depth) 161 162 def __call__(self, *args, **kwargs) -> 'LogEx': 163 if self.info_gatherer is None: 164 info = None 165 else: 166 info = self.info_gatherer(self.depth + 1) 167 168 return self._save_to_copy(self.default__request__type__, args, kwargs, info)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
- request_type
- args
- kwargs
- provide_to_request_handler
- interface
- i
- async_interface
- ai
197class Log(TypedService[None], EntityStatsMixin): 198 def __init__(self, loop: CoroSchedulerType): 199 super(Log, self).__init__(loop) 200 self.default_logs_dir: str = 'log.db' 201 self.path_to_db_environment = None 202 self.app_name_waiter: CoroWrapperBase = None 203 self.root_path_to_log_environment_rel: RelativePath = None 204 self.db_environment: lmdb.Environment = None 205 self.db = None 206 self.log_queue: List[Tuple[Tuple, Dict, Dict[str, Any]]] = list() 207 self.async_loop = None 208 self.log_counter = Counter() 209 self.sync_time_interval = 0.5 210 self.characters_in_counter = 16 211 self.current_counter_state_key = f'{str(0).zfill(self.characters_in_counter)}'.encode() 212 self.last_sync_time = perf_counter() 213 self.force_sync = False 214 self.write_locked = False 215 self.write_locked_coro_id: Optional[CoroID] = None 216 self.periodic_sync_started: bool = False 217 self.iteration_handlers: List[Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]] = list() 218 self.new_iteration_handlers_num: int = 0 219 self.logger_handlers: Dict[logging.Logger, LoggingHandler] = dict() 220 self.logger: logging.Logger = self._loop.logger 221 # self.serializer = best_serializer_for_standard_data((DataFormats.binary, 222 # Tags.can_use_bytes, 223 # Tags.decode_str_as_str, 224 # Tags.decode_list_as_list, 225 # Tags.decode_bytes_as_bytes, 226 # Tags.superficial, 227 # Tags.current_platform, 228 # Tags.multi_platform), 229 # TestDataType.small, 230 # 0.1) 231 self.serializer = Serializer(Serializers.msgspec_messagepack) 232 233 self._request_workers = { 234 0: self._on_set_db_environment_path, 235 1: self._on_sync, 236 2: self._on_add_iteration_handler, 237 3: self._on_remove_iteration_handler, 238 4: self._on_log_extended, 239 5: self._on_connect_to_logger, 240 6: self._on_disconnect_from_logger, 241 } 242 self.inject_handler_to_logger(self._loop.logger) 243 244 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 245 return type(self).__name__, { 246 'log counter': self.log_counter._index, 247 'current counter state key': self.current_counter_state_key, 248 } 249 250 def put_log(self, args, kwargs): 251 self.log_queue.append((args, kwargs, None)) 252 # self.make_live 253 254 def put_log_ex(self, args, kwargs, info): 255 self.log_queue.append((args, kwargs, info)) 256 257 def destroy(self): 258 # TODO: we need to use some loop destroy service in order to put our coro which will write all pending queues, 259 # sync envirounments and close them. Also we need to prevent new requests from being processed. (see DB service) 260 loggers_instancess = list(self.logger_handlers.keys()) 261 for logger_instance in loggers_instancess: 262 self.eject_handler_from_logger(logger_instance) 263 264 if self.db_environment is not None: 265 self.db_environment.close() 266 267 def single_task_registration_or_immediate_processing( 268 self, *args, **kwargs) -> Tuple[bool, None, None]: 269 result = self.try_resolve_request(*args, **kwargs) 270 if result is None: 271 coro_info = self.current_caller_coro_info 272 coro_worker = coro_info.coro.worker 273 if isinstance(coro_worker, GreenletWorkerWrapper): 274 coro_worker = coro_worker.worker 275 276 caller_info = f'CoroID: {coro_info.coro_id:10}; Type: {"Awaitable" if issubclass(coro_info.coro_type, CoroWrapperAsyncAwait) else "Greenlet"}; Worker: {entity_repr_owner_based(coro_worker)}' 277 coro_worker_code = get_code(coro_worker) 278 info = { 279 InfoFields.current_time: datetime.now(), 280 InfoFields.perf_counter_time: perf_counter(), 281 InfoFields.file_name: coro_worker_code.co_filename, 282 InfoFields.line_number: coro_worker_code.co_firstlineno, 283 InfoFields.caller_info: caller_info, 284 InfoFields.traceback_strings: list(), 285 # InfoFields.coro_parents_strings: get_coro_parents_strings(coro_info.coro_id), 286 InfoFields.coro_parents_strings: list(), 287 } 288 self.log_queue.append((args, kwargs, info)) 289 # self.make_live() 290 291 # TODO: we need to implement backpressure mechanism here. If we have too many pending requests, we need to put request to queue instead of responding immediately. 292 # However this will not be enough for a direct requests. We need to implement some kind of backpressure mechanism for direct requests too. 293 return True, None, None 294 else: 295 return result 296 297 def _ensure_default_db_environment(self) -> bool: 298 if self.db_environment is None: 299 if self.path_to_db_environment is None: 300 if self.app_name_waiter is None: 301 async def coro(i: Interface, self: 'Log'): 302 app_name_for_fs = await i(InstanceRequest().wait('app_name_for_fs')) 303 app_data_dir_path_type: AppDirectoryType = await i(InstanceRequest().wait('app_data_dir_path_type')) 304 app_dir_path: AppDirPath = await i(InstanceRequest().wait(AppDirPath)) 305 app_data_dir_path: str = app_dir_path.cached(app_data_dir_path_type, app_name_for_fs) 306 self.path_to_db_environment = RelativePath(app_data_dir_path)(self.default_logs_dir) 307 self._init_db() 308 self.app_name_waiter = None 309 self.make_live() 310 311 self.app_name_waiter = put_root_from_other_service(self, coro, self) 312 313 self.make_dead() 314 return False 315 else: 316 self._init_db() 317 return True 318 else: 319 return True 320 321 def full_processing_iteration(self): 322 if not self._ensure_default_db_environment(): 323 return 324 325 self.force_sync = False 326 log_queue_buff = self.log_queue 327 self.log_queue = type(log_queue_buff)() 328 current_time = perf_counter() 329 current_time_str = str(current_time) 330 for iteration_handler in self.iteration_handlers: 331 iteration_handler(self, log_queue_buff, current_time, current_time_str) 332 333 self.new_iteration_handlers_num = 0 334 335 def handler(): 336 with self.db_environment.begin(write=True) as txn: 337 for log_info in log_queue_buff: 338 key = f'{str(self.log_counter.get()).zfill(self.characters_in_counter)}__{current_time_str}'.encode() 339 value = self.serializer.dumps(log_info) 340 txn.put(key, value, db=self.db, dupdata=True, append=True) 341 342 txn.put(self.current_counter_state_key, self.serializer.dumps(self.log_counter._index), db=self.db) 343 344 lmdb_reapplier(self.db_environment, self.db, handler) 345 346 self.sync_in_thread_pool() 347 348 self.last_sync_time = perf_counter() 349 350 self.make_dead() 351 352 def in_work(self) -> bool: 353 result: bool = bool(self.log_queue) \ 354 or (self.db_environment is None) \ 355 or (not self.periodic_sync_started) \ 356 or self.new_iteration_handlers_num \ 357 or (self.force_sync or ((not self.write_locked) and bool(self.log_queue) and ((perf_counter() - self.last_sync_time) >= self.sync_time_interval))) 358 return self.thrifty_in_work(result) 359 360 def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]: 361 time_since_last_sync_time: float = perf_counter() - self.last_sync_time 362 if self.sync_time_interval > time_since_last_sync_time: 363 return True, self.sync_time_interval - time_since_last_sync_time 364 else: 365 return True, 0 366 367 def _init_db(self): 368 self.logger.info(f'Path to Log DB Env: {self.path_to_db_environment}') 369 self.db_environment = lmdb.open(self.path_to_db_environment, map_size=20 * 1024**2, writemap=True, max_dbs=2, 370 map_async=True, lock=False, metasync=False, sync=False, meminit=False) 371 self.db = self.db_environment.open_db(b'logs') 372 def handler(): 373 with self.db_environment.begin(write=True) as txn: 374 current_counter_state = txn.get(self.current_counter_state_key, db=self.db) 375 if current_counter_state is None: 376 txn.put(self.current_counter_state_key, self.serializer.dumps(self.log_counter._index), db=self.db) 377 else: 378 self.log_counter._index = self.serializer.loads(current_counter_state) 379 380 lmdb_reapplier(self.db_environment, self.db, handler) 381 self.db_environment.sync(True) 382 383 def _on_set_db_environment_path(self, path_to_db_environment: str): 384 if self.write_locked: 385 return True, False, None 386 387 if self.db_environment is None: 388 self.path_to_db_environment = path_to_db_environment 389 try: 390 self._init_db() 391 except: 392 exception = get_exception() 393 return True, False, exception 394 return True, True, None 395 else: 396 return True, False, None 397 398 def _on_sync(self): 399 if self.db_environment is None: 400 self.make_live() 401 else: 402 if self.log_queue: 403 self.force_sync = True 404 self.make_live() 405 else: 406 # self.db_environment.sync(True) 407 self.sync_in_thread_pool() 408 409 return True, None, None 410 411 def _on_log_extended(self, args, kwargs, info): 412 self.log_queue.append((args, kwargs, info)) 413 return True, None, None 414 415 def _on_add_iteration_handler(self, handler: Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]): 416 self.iteration_handlers.append(handler) 417 self.new_iteration_handlers_num += 1 418 return True, None, None 419 420 def _on_remove_iteration_handler(self, handler: Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]): 421 removed = False 422 try: 423 self.iteration_handlers.remove(handler) 424 removed = True 425 except ValueError: 426 pass 427 428 return True, removed, None 429 430 def sync_in_thread_pool(self): 431 async def sync_db_coro(i: Interface, self: 'Log', asyncio_loop, need_to_ensure_asyncio_loop: bool): 432 if need_to_ensure_asyncio_loop: 433 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None, CoroPriority.low, True)) 434 else: 435 if asyncio_loop is None: 436 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().get()) 437 438 async def sync_db(self, asyncio_loop): 439 def sync_worker(): 440 self.db_environment.sync(True) 441 442 await task_in_thread_pool(asyncio_loop, sync_worker) 443 444 await i(AsyncioLoop, AsyncioLoopRequest().wait(sync_db(self, asyncio_loop))) 445 self.write_locked_coro_id = None 446 self.write_locked = False 447 def make_service_live_for_a_next_sync(self: 'Log'): 448 self.periodic_sync_started = False 449 self.make_live() 450 451 await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self) 452 453 asyncio_loop = None 454 need_to_ensure_asyncio_loop = False 455 try: 456 asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get() 457 except AsyncioLoopWasNotSetError: 458 need_to_ensure_asyncio_loop = True 459 460 coro: CoroWrapperBase = self._loop.put_coro(sync_db_coro, self, asyncio_loop, need_to_ensure_asyncio_loop) 461 self.write_locked = True 462 self.write_locked_coro_id = coro.coro_id 463 self.periodic_sync_started = True 464 465 def get_last_n_logs(self, number: Union[None, int]=None) -> List[Tuple[Tuple, Dict, Dict[str, Any]]]: 466 if self.db_environment is None: 467 return list() 468 469 if number is None: 470 number = self.log_counter._index + 1 471 elif number <= 0: 472 return list() 473 474 result = list() 475 with self.db_environment.begin(db=self.db) as txn: 476 txn.cursor().last() 477 for key, value in txn.cursor().iterprev(): 478 if key == self.current_counter_state_key: 479 continue 480 481 if number <= 0: 482 break 483 484 result.append(self.serializer.loads(value)) 485 number -= 1 486 487 result.reverse() 488 return result 489 490 def inject_handler_to_logger(self, logger_instance: logging.Logger) -> bool: 491 if logger_instance not in self.logger_handlers: 492 logger_handler: LoggingHandler = LoggingHandler(self) 493 self.logger_handlers[logger_instance] = logger_handler 494 logger_instance.addHandler(logger_handler) 495 return True 496 else: 497 return False 498 499 def _on_connect_to_logger(self, logger_instance: logging.Logger): 500 result = self.inject_handler_to_logger(logger_instance) 501 return True, result, None 502 503 def eject_handler_from_logger(self, logger_instance: logging.Logger) -> bool: 504 if logger_instance in self.logger_handlers: 505 logger_instance.removeHandler(self.logger_handlers[logger_instance]) 506 del self.logger_handlers[logger_instance] 507 return True 508 else: 509 return False 510 511 def _on_disconnect_from_logger(self, logger_instance: logging.Logger): 512 result = self.eject_handler_from_logger(logger_instance) 513 return True, result, None
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
198 def __init__(self, loop: CoroSchedulerType): 199 super(Log, self).__init__(loop) 200 self.default_logs_dir: str = 'log.db' 201 self.path_to_db_environment = None 202 self.app_name_waiter: CoroWrapperBase = None 203 self.root_path_to_log_environment_rel: RelativePath = None 204 self.db_environment: lmdb.Environment = None 205 self.db = None 206 self.log_queue: List[Tuple[Tuple, Dict, Dict[str, Any]]] = list() 207 self.async_loop = None 208 self.log_counter = Counter() 209 self.sync_time_interval = 0.5 210 self.characters_in_counter = 16 211 self.current_counter_state_key = f'{str(0).zfill(self.characters_in_counter)}'.encode() 212 self.last_sync_time = perf_counter() 213 self.force_sync = False 214 self.write_locked = False 215 self.write_locked_coro_id: Optional[CoroID] = None 216 self.periodic_sync_started: bool = False 217 self.iteration_handlers: List[Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]] = list() 218 self.new_iteration_handlers_num: int = 0 219 self.logger_handlers: Dict[logging.Logger, LoggingHandler] = dict() 220 self.logger: logging.Logger = self._loop.logger 221 # self.serializer = best_serializer_for_standard_data((DataFormats.binary, 222 # Tags.can_use_bytes, 223 # Tags.decode_str_as_str, 224 # Tags.decode_list_as_list, 225 # Tags.decode_bytes_as_bytes, 226 # Tags.superficial, 227 # Tags.current_platform, 228 # Tags.multi_platform), 229 # TestDataType.small, 230 # 0.1) 231 self.serializer = Serializer(Serializers.msgspec_messagepack) 232 233 self._request_workers = { 234 0: self._on_set_db_environment_path, 235 1: self._on_sync, 236 2: self._on_add_iteration_handler, 237 3: self._on_remove_iteration_handler, 238 4: self._on_log_extended, 239 5: self._on_connect_to_logger, 240 6: self._on_disconnect_from_logger, 241 } 242 self.inject_handler_to_logger(self._loop.logger)
257 def destroy(self): 258 # TODO: we need to use some loop destroy service in order to put our coro which will write all pending queues, 259 # sync envirounments and close them. Also we need to prevent new requests from being processed. (see DB service) 260 loggers_instancess = list(self.logger_handlers.keys()) 261 for logger_instance in loggers_instancess: 262 self.eject_handler_from_logger(logger_instance) 263 264 if self.db_environment is not None: 265 self.db_environment.close()
267 def single_task_registration_or_immediate_processing( 268 self, *args, **kwargs) -> Tuple[bool, None, None]: 269 result = self.try_resolve_request(*args, **kwargs) 270 if result is None: 271 coro_info = self.current_caller_coro_info 272 coro_worker = coro_info.coro.worker 273 if isinstance(coro_worker, GreenletWorkerWrapper): 274 coro_worker = coro_worker.worker 275 276 caller_info = f'CoroID: {coro_info.coro_id:10}; Type: {"Awaitable" if issubclass(coro_info.coro_type, CoroWrapperAsyncAwait) else "Greenlet"}; Worker: {entity_repr_owner_based(coro_worker)}' 277 coro_worker_code = get_code(coro_worker) 278 info = { 279 InfoFields.current_time: datetime.now(), 280 InfoFields.perf_counter_time: perf_counter(), 281 InfoFields.file_name: coro_worker_code.co_filename, 282 InfoFields.line_number: coro_worker_code.co_firstlineno, 283 InfoFields.caller_info: caller_info, 284 InfoFields.traceback_strings: list(), 285 # InfoFields.coro_parents_strings: get_coro_parents_strings(coro_info.coro_id), 286 InfoFields.coro_parents_strings: list(), 287 } 288 self.log_queue.append((args, kwargs, info)) 289 # self.make_live() 290 291 # TODO: we need to implement backpressure mechanism here. If we have too many pending requests, we need to put request to queue instead of responding immediately. 292 # However this will not be enough for a direct requests. We need to implement some kind of backpressure mechanism for direct requests too. 293 return True, None, None 294 else: 295 return result
321 def full_processing_iteration(self): 322 if not self._ensure_default_db_environment(): 323 return 324 325 self.force_sync = False 326 log_queue_buff = self.log_queue 327 self.log_queue = type(log_queue_buff)() 328 current_time = perf_counter() 329 current_time_str = str(current_time) 330 for iteration_handler in self.iteration_handlers: 331 iteration_handler(self, log_queue_buff, current_time, current_time_str) 332 333 self.new_iteration_handlers_num = 0 334 335 def handler(): 336 with self.db_environment.begin(write=True) as txn: 337 for log_info in log_queue_buff: 338 key = f'{str(self.log_counter.get()).zfill(self.characters_in_counter)}__{current_time_str}'.encode() 339 value = self.serializer.dumps(log_info) 340 txn.put(key, value, db=self.db, dupdata=True, append=True) 341 342 txn.put(self.current_counter_state_key, self.serializer.dumps(self.log_counter._index), db=self.db) 343 344 lmdb_reapplier(self.db_environment, self.db, handler) 345 346 self.sync_in_thread_pool() 347 348 self.last_sync_time = perf_counter() 349 350 self.make_dead()
352 def in_work(self) -> bool: 353 result: bool = bool(self.log_queue) \ 354 or (self.db_environment is None) \ 355 or (not self.periodic_sync_started) \ 356 or self.new_iteration_handlers_num \ 357 or (self.force_sync or ((not self.write_locked) and bool(self.log_queue) and ((perf_counter() - self.last_sync_time) >= self.sync_time_interval))) 358 return self.thrifty_in_work(result)
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
360 def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]: 361 time_since_last_sync_time: float = perf_counter() - self.last_sync_time 362 if self.sync_time_interval > time_since_last_sync_time: 363 return True, self.sync_time_interval - time_since_last_sync_time 364 else: 365 return True, 0
430 def sync_in_thread_pool(self): 431 async def sync_db_coro(i: Interface, self: 'Log', asyncio_loop, need_to_ensure_asyncio_loop: bool): 432 if need_to_ensure_asyncio_loop: 433 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None, CoroPriority.low, True)) 434 else: 435 if asyncio_loop is None: 436 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().get()) 437 438 async def sync_db(self, asyncio_loop): 439 def sync_worker(): 440 self.db_environment.sync(True) 441 442 await task_in_thread_pool(asyncio_loop, sync_worker) 443 444 await i(AsyncioLoop, AsyncioLoopRequest().wait(sync_db(self, asyncio_loop))) 445 self.write_locked_coro_id = None 446 self.write_locked = False 447 def make_service_live_for_a_next_sync(self: 'Log'): 448 self.periodic_sync_started = False 449 self.make_live() 450 451 await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self) 452 453 asyncio_loop = None 454 need_to_ensure_asyncio_loop = False 455 try: 456 asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get() 457 except AsyncioLoopWasNotSetError: 458 need_to_ensure_asyncio_loop = True 459 460 coro: CoroWrapperBase = self._loop.put_coro(sync_db_coro, self, asyncio_loop, need_to_ensure_asyncio_loop) 461 self.write_locked = True 462 self.write_locked_coro_id = coro.coro_id 463 self.periodic_sync_started = True
465 def get_last_n_logs(self, number: Union[None, int]=None) -> List[Tuple[Tuple, Dict, Dict[str, Any]]]: 466 if self.db_environment is None: 467 return list() 468 469 if number is None: 470 number = self.log_counter._index + 1 471 elif number <= 0: 472 return list() 473 474 result = list() 475 with self.db_environment.begin(db=self.db) as txn: 476 txn.cursor().last() 477 for key, value in txn.cursor().iterprev(): 478 if key == self.current_counter_state_key: 479 continue 480 481 if number <= 0: 482 break 483 484 result.append(self.serializer.loads(value)) 485 number -= 1 486 487 result.reverse() 488 return result
490 def inject_handler_to_logger(self, logger_instance: logging.Logger) -> bool: 491 if logger_instance not in self.logger_handlers: 492 logger_handler: LoggingHandler = LoggingHandler(self) 493 self.logger_handlers[logger_instance] = logger_handler 494 logger_instance.addHandler(logger_handler) 495 return True 496 else: 497 return False
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
- StatsLevel
- get_entity_stats
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
174class LogRequest(ServiceRequest): 175 def set_db_environment_path(self, path_to_db_environment: str) -> ServiceRequest: 176 return self._save(0, path_to_db_environment) 177 178 def sync(self) -> ServiceRequest: 179 return self._save(1) 180 181 def add_iteration_handler(self, handler: Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]) -> ServiceRequest: 182 return self._save(2, handler) 183 184 def remove_iteration_handler(self, handler: Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]) -> ServiceRequest: 185 return self._save(3, handler) 186 187 def log(self, *args, **kwargs) -> ServiceRequest: 188 return LogEx[None](default_info_gatherer, depth=2)(*args, **kwargs) 189 190 def connect_to_logger(self, logger_instance: logging.Logger) -> ServiceRequest: 191 return self._save(5, logger_instance) 192 193 def disconnect_from_logger(self, logger_instance: logging.Logger) -> ServiceRequest: 194 return self._save(6, logger_instance)
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
- default__request__type__
- request_type
- args
- kwargs
- provide_to_request_handler
- interface
- i
- async_interface
- ai
604def view_log(path_to_db_environment: Optional[str]=None, file_to_redirect_output: Optional[str]=None): 605 if path_to_db_environment is None: 606 path_to_db_environment = path_relative_to_current_dir('log.db') 607 608 output_file = None 609 if file_to_redirect_output is not None: 610 output_file = open(file_to_redirect_output, 'wb') 611 612 try: 613 db_environment = lmdb.open(path_to_db_environment, map_size=20 * 1024 ** 2, writemap=True, max_dbs=2) 614 db = db_environment.open_db(b'logs') 615 # serializer = best_serializer_for_standard_data((DataFormats.binary, 616 # Tags.can_use_bytes, 617 # Tags.decode_str_as_str, 618 # Tags.decode_list_as_list, 619 # Tags.decode_bytes_as_bytes, 620 # Tags.superficial, 621 # Tags.current_platform, 622 # Tags.multi_platform), 623 # TestDataType.small, 624 # 0.1) 625 serializer = Serializer(Serializers.msgspec_messagepack) 626 with db_environment.begin() as txn: 627 for key, value in txn.cursor(db=db): 628 if key == f'{str(0).zfill(16)}'.encode(): 629 if output_file is None: 630 print(f'λλλ <<< {serializer.loads(value)} >>>') 631 print() 632 else: 633 output_file.write(f'λλλ <<< {serializer.loads(value)} >>>'.encode()) 634 output_file.write('\n'.encode()) 635 else: 636 args, kwargs, info = serializer.loads(value) 637 if output_file is None: 638 print(f'λ >>>\t{key}: {"~"*8}') 639 print(*args, **kwargs) 640 print(info) 641 print() 642 else: 643 output_file.write(f'λ >>>\t{key}: {"~"*8}\n'.encode()) 644 output_file.write(f'{args_kwargs_to_str(args, kwargs)}\n'.encode()) 645 output_file.write('\n'.encode()) 646 finally: 647 if output_file is not None: 648 output_file.close()
651def clear_log(path_to_db_environment: Optional[str]=None): 652 if path_to_db_environment is None: 653 path_to_db_environment = path_relative_to_current_dir('log.db') 654 db_environment = lmdb.open(path_to_db_environment, map_size=20 * 1024 ** 2, writemap=True, max_dbs=2) 655 db = db_environment.open_db(b'logs') 656 def handler(): 657 with db_environment.begin(write=True) as txn: 658 txn.drop(db=db, delete=False) 659 lmdb_reapplier(db_environment, db, handler)
520class LogClient: 521 def __init__(self, info_gatherer: Optional[Callable[[int], Dict[str, Any]]] = None) -> None: 522 self.info_gatherer: Optional[Callable] = info_gatherer 523 self.log_extended_request: LogEx = LogEx(default_info_gatherer, depth=2) 524 self.extended: bool = True 525 526 def log_fast(self, i: Interface, *args, **kwargs) -> Optional[Any]: 527 if self.extended: 528 i(self.log_extended_request(*args, **kwargs)) 529 else: 530 i(Log, *args, **kwargs) 531 532 return args[0] if args else None 533 534 async def alog_fast(self, i: Interface, *args, **kwargs) -> Optional[Any]: 535 if self.extended: 536 await i(self.log_extended_request(*args, **kwargs)) 537 else: 538 await i(Log, *args, **kwargs) 539 540 return args[0] if args else None 541 542 def log(self, *args, **kwargs) -> Optional[Any]: 543 if self.extended: 544 current_interface()(self.log_extended_request(*args, **kwargs)) 545 else: 546 current_interface()(Log, *args, **kwargs) 547 548 return args[0] if args else None 549 550 async def alog(self, *args, **kwargs) -> Optional[Any]: 551 if self.extended: 552 await current_interface()(self.log_extended_request(*args, **kwargs)) 553 else: 554 await current_interface()(Log, *args, **kwargs) 555 556 return args[0] if args else None 557 558 def put_log_fast(self, scheduler: CoroSchedulerType, *args, **kwargs) -> Optional[Any]: 559 if self.extended: 560 log_ex_request: LogExtended = self.log_extended_request(*args, **kwargs) 561 scheduler.get_service_instance_fast(Log).put_log_ex(*log_ex_request.args, **log_ex_request.kwargs) 562 else: 563 scheduler.get_service_instance_fast(Log).put_log(args, kwargs) 564 565 return args[0] if args else None 566 567 plog_fast = put_log_fast 568 569 # async def aput_log_fast(self, scheduler: CoroSchedulerType, *args, **kwargs): 570 # if self.extended: 571 # log_ex_request: LogExtended = self.log_extended_request(*args, **kwargs) 572 # scheduler.get_service_instance_fast(Log).put_log_ex(*log_ex_request.args, **log_ex_request.kwargs) 573 # else: 574 # scheduler.get_service_instance_fast(Log).put_log(args, kwargs) 575 576 # aplog_fast = aput_log_fast 577 578 def put_log(self, *args, **kwargs) -> Optional[Any]: 579 return self.put_log_fast(current_coro_scheduler(), *args, **kwargs) 580 581 plog = put_log 582 583 # async def aput_log(self, *args, **kwargs): 584 # self.put_log_fast(current_coro_scheduler(), *args, **kwargs) 585 586 # aplog = aput_log
558 def put_log_fast(self, scheduler: CoroSchedulerType, *args, **kwargs) -> Optional[Any]: 559 if self.extended: 560 log_ex_request: LogExtended = self.log_extended_request(*args, **kwargs) 561 scheduler.get_service_instance_fast(Log).put_log_ex(*log_ex_request.args, **log_ex_request.kwargs) 562 else: 563 scheduler.get_service_instance_fast(Log).put_log(args, kwargs) 564 565 return args[0] if args else None
558 def put_log_fast(self, scheduler: CoroSchedulerType, *args, **kwargs) -> Optional[Any]: 559 if self.extended: 560 log_ex_request: LogExtended = self.log_extended_request(*args, **kwargs) 561 scheduler.get_service_instance_fast(Log).put_log_ex(*log_ex_request.args, **log_ex_request.kwargs) 562 else: 563 scheduler.get_service_instance_fast(Log).put_log(args, kwargs) 564 565 return args[0] if args else None
558 def put_log_fast(self, scheduler: CoroSchedulerType, *args, **kwargs) -> Optional[Any]: 559 if self.extended: 560 log_ex_request: LogExtended = self.log_extended_request(*args, **kwargs) 561 scheduler.get_service_instance_fast(Log).put_log_ex(*log_ex_request.args, **log_ex_request.kwargs) 562 else: 563 scheduler.get_service_instance_fast(Log).put_log(args, kwargs) 564 565 return args[0] if args else None
558 def put_log_fast(self, scheduler: CoroSchedulerType, *args, **kwargs) -> Optional[Any]: 559 if self.extended: 560 log_ex_request: LogExtended = self.log_extended_request(*args, **kwargs) 561 scheduler.get_service_instance_fast(Log).put_log_ex(*log_ex_request.args, **log_ex_request.kwargs) 562 else: 563 scheduler.get_service_instance_fast(Log).put_log(args, kwargs) 564 565 return args[0] if args else None