cengal.parallel_execution.coroutines.coro_standard_services.log.versions.v_0.log

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19__all__ = ['InfoFields', 'default_info_gatherer', 'LogExtended', 'LogEx', 'Log', 'LogRequest', 
 20           'view_log', 'clear_log', 'LogClient', 'default_log_client', 'log_fast', 'alog_fast', 
 21           'log', 'alog', 'put_log_fast', 'plog_fast', 'put_log', 'plog']
 22
 23from cengal.parallel_execution.coroutines.coro_scheduler import *
 24from cengal.parallel_execution.coroutines.coro_tools.await_coro import *
 25from cengal.parallel_execution.coroutines.coro_standard_services.asyncio_loop import *
 26from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import CoroPriority
 27from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import *
 28from cengal.parallel_execution.coroutines.coro_standard_services.timer_func_runner import *
 29from cengal.parallel_execution.coroutines.coro_standard_services.instance import *
 30from cengal.file_system.app_fs_structure.app_dir_path import AppDirPath, AppDirectoryType
 31from cengal.file_system.path_manager import RelativePath
 32from cengal.file_system.file_manager import path_relative_to_current_dir
 33from cengal.time_management.cpu_clock_cycles import perf_counter
 34from cengal.data_manipulation.serialization import *
 35from cengal.introspection.inspect import get_exception, frame, entity_repr_owner_based, entity_name
 36from cengal.code_flow_control.python_bytecode_manipulator import get_code
 37from cengal.code_flow_control.args_manager import args_kwargs_to_str
 38from cengal.code_flow_control.smart_values import ValueExistence
 39from enum import IntEnum
 40from traceback import format_stack
 41from typing import Tuple, List, Any, Dict, Callable
 42from datetime import datetime
 43import logging
 44import sys
 45import os
 46import asyncio
 47try:
 48    import lmdb
 49except ImportError:
 50    from warnings import warn
 51    warn('''WARNING: `lmdb` library is not installed. Log service will not work.
 52         To install `lmdb` use: `pip install lmdb`''')
 53    raise
 54
 55
 56"""
 57Module Docstring
 58Docstrings: http://www.python.org/dev/peps/pep-0257/
 59"""
 60
 61__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 62__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 63__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 64__license__ = "Apache License, Version 2.0"
 65__version__ = "4.4.1"
 66__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 67__email__ = "gtalk@butenkoms.space"
 68# __status__ = "Prototype"
 69__status__ = "Development"
 70# __status__ = "Production"
 71    
 72
 73def get_coro_parents_path(coro_id: CoroID) -> List[CoroID]:
 74    parents: Set[CoroID] = set()
 75    result: List[CoroID] = list()
 76    def handler(deep, child, parent, index):
 77        if parent is not None:
 78            coro_id = parent[1]
 79            parents.add(coro_id)
 80            result.append(coro_id)
 81    
 82    try_travers_through_all_coro_parents(coro_id, handler)
 83    return result
 84
 85
 86def coro_info_string(cs: CoroSchedulerType, coro_id: CoroID) -> str:
 87    coro: Optional[CoroWrapperBase] = cs.get_coro(coro_id)
 88    if coro is None:
 89        return f'  CoroID: {coro_id:10}'
 90    else:
 91        coro_worker = coro.worker
 92        if isinstance(coro_worker, GreenletWorkerWrapper):
 93            coro_worker = coro_worker.worker
 94        
 95        return f'  CoroID: {coro_id:10}; Type: {"Awaitable" if isinstance(coro, CoroWrapperAsyncAwait) else "Greenlet"}; Worker: {entity_repr_owner_based(coro_worker)}'
 96
 97
 98def get_coro_parents_strings(coro_id: CoroID) -> List[str]:
 99    coro_parents_path: List[CoroID] = get_coro_parents_path(coro_id)
100    if coro_parents_path:
101        cs: CoroSchedulerType = get_current_coro_scheduler()
102        if cs is None:
103            return list([f'  CoroID: {coro_id:10}' for coro_id in coro_parents_path])
104        else:
105            return list([coro_info_string(cs, coro_id) for coro_id in coro_parents_path])
106    else:
107        return list()
108
109
110class InfoFields(IntEnum):
111    current_time = 0
112    file_name = 1
113    line_number = 2
114    caller_info = 3
115    traceback_strings = 4
116    perf_counter_time = 5
117    coro_parents_strings = 6
118    logging_level = 7
119
120
121def default_info_gatherer(depth: int) -> Dict[str, Any]:
122    interested_frame = frame(depth + 1)
123    try:
124        interface: Interface = current_interface()
125    except OutsideCoroSchedulerContext:
126        interface = None
127
128    if interface is None:
129        caller_info = entity_repr_owner_based(interested_frame)
130        coro_parents_strings = list()
131    else:
132        coro_worker = interface._coro.worker
133        if isinstance(coro_worker, GreenletWorkerWrapper):
134            coro_worker = coro_worker.worker
135        
136        caller_info = f'CoroID: {interface.coro_id:10}; Type: {"Awaitable" if isinstance(interface._coro, CoroWrapperAsyncAwait) else "Greenlet"}; Worker: {entity_repr_owner_based(coro_worker)}'
137        coro_parents_strings = get_coro_parents_strings(interface.coro_id)
138    
139    return {
140        InfoFields.current_time: datetime.now(),
141        InfoFields.perf_counter_time: perf_counter(),
142        InfoFields.file_name: interested_frame.f_code.co_filename,
143        InfoFields.line_number: interested_frame.f_lineno,
144        InfoFields.caller_info: caller_info,
145        InfoFields.traceback_strings: format_stack(interested_frame),
146        InfoFields.coro_parents_strings: coro_parents_strings,
147    }
148
149
150class LogExtended(TypedServiceRequest[None]):
151    default__request__type__ = 4
152
153    def __init__(self, info_gatherer: Optional[Callable[[int], Dict[str, Any]]] = None, depth: Optional[int] = 1):
154        super().__init__()
155        self.info_gatherer: Optional[Callable] = info_gatherer
156        self.depth: Optional[int] = depth
157    
158    def _copy(self) -> 'LogExtended':
159        return LogExtended(self.info_gatherer, self.depth)
160
161    def __call__(self, *args, **kwargs) -> 'LogEx':
162        if self.info_gatherer is None:
163            info = None
164        else:
165            info = self.info_gatherer(self.depth + 1)
166        
167        return self._save_to_copy(self.default__request__type__, args, kwargs, info)
168
169
170LogEx = LogExtended
171
172
173class LogRequest(ServiceRequest):
174    def set_db_environment_path(self, path_to_db_environment: str) -> ServiceRequest:
175        return self._save(0, path_to_db_environment)
176    
177    def sync(self) -> ServiceRequest:
178        return self._save(1)
179
180    def add_iteration_handler(self, handler: Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]) -> ServiceRequest:
181        return self._save(2, handler)
182
183    def remove_iteration_handler(self, handler: Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]) -> ServiceRequest:
184        return self._save(3, handler)
185    
186    def log(self, *args, **kwargs) -> ServiceRequest:
187        return LogEx[None](default_info_gatherer, depth=2)(*args, **kwargs)
188
189    def connect_to_logger(self, logger_instance: logging.Logger) -> ServiceRequest:
190        return self._save(5, logger_instance)
191
192    def disconnect_from_logger(self, logger_instance: logging.Logger) -> ServiceRequest:
193        return self._save(6, logger_instance)
194    
195
196class Log(TypedService[None], EntityStatsMixin):
197    def __init__(self, loop: CoroSchedulerType):
198        super(Log, self).__init__(loop)
199        self.default_logs_dir: str = 'log.db'
200        self.path_to_db_environment = None
201        self.app_name_waiter: CoroWrapperBase = None
202        self.root_path_to_log_environment_rel: RelativePath = None
203        self.db_environment: lmdb.Environment = None
204        self.db = None
205        self.log_queue: List[Tuple[Tuple, Dict, Dict[str, Any]]] = list()
206        self.async_loop = None
207        self.log_counter = Counter()
208        self.sync_time_interval = 0.5
209        self.characters_in_counter = 16
210        self.current_counter_state_key = f'{str(0).zfill(self.characters_in_counter)}'.encode()
211        self.last_sync_time = perf_counter()
212        self.force_sync = False
213        self.write_locked = False
214        self.write_locked_coro_id: Optional[CoroID] = None
215        self.periodic_sync_started: bool = False
216        self.iteration_handlers: List[Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]] = list()
217        self.new_iteration_handlers_num: int = 0
218        self.logger_handlers: Dict[logging.Logger, LoggingHandler] = dict()
219        self.logger: logging.Logger = self._loop.logger
220        # self.serializer = best_serializer_for_standard_data((DataFormats.binary,
221        #                                    Tags.can_use_bytes,
222        #                                    Tags.decode_str_as_str,
223        #                                    Tags.decode_list_as_list,
224        #                                    Tags.decode_bytes_as_bytes,
225        #                                    Tags.superficial,
226        #                                    Tags.current_platform,
227        #                                    Tags.multi_platform),
228        #                                   TestDataType.small,
229        #                                   0.1)
230        self.serializer = Serializer(Serializers.msgspec_messagepack)
231
232        self._request_workers = {
233            0: self._on_set_db_environment_path,
234            1: self._on_sync,
235            2: self._on_add_iteration_handler,
236            3: self._on_remove_iteration_handler,
237            4: self._on_log_extended,
238            5: self._on_connect_to_logger,
239            6: self._on_disconnect_from_logger,
240        }
241        self.inject_handler_to_logger(self._loop.logger)
242
243    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
244        return type(self).__name__, {
245            'log counter': self.log_counter._index,
246            'current counter state key': self.current_counter_state_key,
247        }
248    
249    def put_log(self, args, kwargs):
250        self.log_queue.append((args, kwargs, None))
251        # self.make_live
252    
253    def put_log_ex(self, args, kwargs, info):
254        self.log_queue.append((args, kwargs, info))
255
256    def destroy(self):
257        # TODO: we need to use some loop destroy service in order to put our coro which will write all pending queues,
258        # sync envirounments and close them. Also we need to prevent new requests from being processed. (see DB service)
259        loggers_instancess = list(self.logger_handlers.keys())
260        for logger_instance in loggers_instancess:
261            self.eject_handler_from_logger(logger_instance)
262
263        if self.db_environment is not None:
264            self.db_environment.close()
265
266    def single_task_registration_or_immediate_processing(
267            self, *args, **kwargs) -> Tuple[bool, None, None]:
268        result = self.try_resolve_request(*args, **kwargs)
269        if result is None:
270            coro_info = self.current_caller_coro_info
271            coro_worker = coro_info.coro.worker
272            if isinstance(coro_worker, GreenletWorkerWrapper):
273                coro_worker = coro_worker.worker
274            
275            caller_info = f'CoroID: {coro_info.coro_id:10}; Type: {"Awaitable" if issubclass(coro_info.coro_type, CoroWrapperAsyncAwait) else "Greenlet"}; Worker: {entity_repr_owner_based(coro_worker)}'
276            coro_worker_code = get_code(coro_worker)
277            info = {
278                InfoFields.current_time: datetime.now(),
279                InfoFields.perf_counter_time: perf_counter(),
280                InfoFields.file_name: coro_worker_code.co_filename,
281                InfoFields.line_number: coro_worker_code.co_firstlineno,
282                InfoFields.caller_info: caller_info,
283                InfoFields.traceback_strings: list(),
284                # InfoFields.coro_parents_strings: get_coro_parents_strings(coro_info.coro_id),
285                InfoFields.coro_parents_strings: list(),
286            }
287            self.log_queue.append((args, kwargs, info))
288            # self.make_live()
289
290            # TODO: we need to implement backpressure mechanism here. If we have too many pending requests, we need to put request to queue instead of responding immediately.
291            # However this will not be enough for a direct requests. We need to implement some kind of backpressure mechanism for direct requests too.
292            return True, None, None
293        else:
294            return result
295
296    def _ensure_default_db_environment(self) -> bool:
297        if self.db_environment is None:
298            if self.path_to_db_environment is None:
299                if self.app_name_waiter is None:
300                    async def coro(i: Interface, self: 'Log'):
301                        app_name_for_fs = await i(InstanceRequest().wait('app_name_for_fs'))
302                        app_data_dir_path_type: AppDirectoryType = await i(InstanceRequest().wait('app_data_dir_path_type'))
303                        app_dir_path: AppDirPath = await i(InstanceRequest().wait(AppDirPath))
304                        app_data_dir_path: str = app_dir_path.cached(app_data_dir_path_type, app_name_for_fs)
305                        self.path_to_db_environment = RelativePath(app_data_dir_path)(self.default_logs_dir)
306                        self._init_db()
307                        self.app_name_waiter = None
308                        self.make_live()
309                    
310                    self.app_name_waiter = put_root_from_other_service(self, coro, self)
311                
312                self.make_dead()
313                return False
314            else:
315                self._init_db()
316                return True
317        else:
318            return True
319
320    def full_processing_iteration(self):
321        if not self._ensure_default_db_environment():
322            return
323        
324        self.force_sync = False
325        log_queue_buff = self.log_queue
326        self.log_queue = type(log_queue_buff)()
327        current_time = perf_counter()
328        current_time_str = str(current_time)
329        for iteration_handler in self.iteration_handlers:
330            iteration_handler(self, log_queue_buff, current_time, current_time_str)
331        
332        self.new_iteration_handlers_num = 0
333
334        def handler():
335            with self.db_environment.begin(write=True) as txn:
336                for log_info in log_queue_buff:
337                    key = f'{str(self.log_counter.get()).zfill(self.characters_in_counter)}__{current_time_str}'.encode()
338                    value = self.serializer.dumps(log_info)
339                    txn.put(key, value, db=self.db, dupdata=True, append=True)
340                
341                txn.put(self.current_counter_state_key, self.serializer.dumps(self.log_counter._index), db=self.db)
342        
343        lmdb_reapplier(self.db_environment, self.db, handler)
344        
345        self.sync_in_thread_pool()
346        
347        self.last_sync_time = perf_counter()
348
349        self.make_dead()
350
351    def in_work(self) -> bool:
352        result: bool = bool(self.log_queue) \
353            or (self.db_environment is None) \
354            or (not self.periodic_sync_started) \
355            or self.new_iteration_handlers_num \
356            or (self.force_sync or ((not self.write_locked) and bool(self.log_queue) and ((perf_counter() - self.last_sync_time) >= self.sync_time_interval)))
357        return self.thrifty_in_work(result)
358    
359    def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]:
360        time_since_last_sync_time: float = perf_counter() - self.last_sync_time
361        if self.sync_time_interval > time_since_last_sync_time:
362            return True, self.sync_time_interval - time_since_last_sync_time
363        else:
364            return True, 0
365
366    def _init_db(self):
367        self.logger.info(f'Path to Log DB Env: {self.path_to_db_environment}')
368        self.db_environment = lmdb.open(self.path_to_db_environment, map_size=20 * 1024**2, writemap=True, max_dbs=2,
369                                        map_async=True, lock=False, metasync=False, sync=False, meminit=False)
370        self.db = self.db_environment.open_db(b'logs')
371        def handler():
372            with self.db_environment.begin(write=True) as txn:
373                current_counter_state = txn.get(self.current_counter_state_key, db=self.db)
374                if current_counter_state is None:
375                    txn.put(self.current_counter_state_key, self.serializer.dumps(self.log_counter._index), db=self.db)
376                else:
377                    self.log_counter._index = self.serializer.loads(current_counter_state)
378        
379        lmdb_reapplier(self.db_environment, self.db, handler)
380        self.db_environment.sync(True)
381
382    def _on_set_db_environment_path(self, path_to_db_environment: str):
383        if self.write_locked:
384            return True, False, None
385        
386        if self.db_environment is None:
387            self.path_to_db_environment = path_to_db_environment
388            try:
389                self._init_db()
390            except:
391                exception = get_exception()
392                return True, False, exception
393            return True, True, None
394        else:
395            return True, False, None
396    
397    def _on_sync(self):
398        if self.db_environment is None:
399            self.make_live()
400        else:
401            if self.log_queue:
402                self.force_sync = True
403                self.make_live()
404            else:
405                # self.db_environment.sync(True)
406                self.sync_in_thread_pool()
407        
408        return True, None, None
409
410    def _on_log_extended(self, args, kwargs, info):
411        self.log_queue.append((args, kwargs, info))
412        return True, None, None
413
414    def _on_add_iteration_handler(self, handler: Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]):
415        self.iteration_handlers.append(handler)
416        self.new_iteration_handlers_num += 1
417        return True, None, None
418    
419    def _on_remove_iteration_handler(self, handler: Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]):
420        removed = False
421        try:
422            self.iteration_handlers.remove(handler)
423            removed = True
424        except ValueError:
425            pass
426
427        return True, removed, None
428
429    def sync_in_thread_pool(self):
430        async def sync_db_coro(i: Interface, self: 'Log', asyncio_loop, need_to_ensure_asyncio_loop: bool):
431            if need_to_ensure_asyncio_loop:
432                asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None, CoroPriority.low, True))
433            else:
434                if asyncio_loop is None:
435                    asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().get())
436            
437            async def sync_db(self, asyncio_loop):
438                def sync_worker():
439                    self.db_environment.sync(True)
440                
441                await task_in_thread_pool(asyncio_loop, sync_worker)
442
443            await i(AsyncioLoop, AsyncioLoopRequest().wait(sync_db(self, asyncio_loop)))
444            self.write_locked_coro_id = None
445            self.write_locked = False
446            def make_service_live_for_a_next_sync(self: 'Log'):
447                self.periodic_sync_started = False
448                self.make_live()
449            
450            await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self)
451
452        asyncio_loop = None
453        need_to_ensure_asyncio_loop = False
454        try:
455            asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get()
456        except AsyncioLoopWasNotSetError:
457            need_to_ensure_asyncio_loop = True
458
459        coro: CoroWrapperBase = self._loop.put_coro(sync_db_coro, self, asyncio_loop, need_to_ensure_asyncio_loop)
460        self.write_locked = True
461        self.write_locked_coro_id = coro.coro_id
462        self.periodic_sync_started = True
463    
464    def get_last_n_logs(self, number: Union[None, int]=None) -> List[Tuple[Tuple, Dict, Dict[str, Any]]]:
465        if self.db_environment is None:
466            return list()
467        
468        if number is None:
469            number = self.log_counter._index + 1
470        elif number <= 0:
471            return list()
472        
473        result = list()
474        with self.db_environment.begin(db=self.db) as txn:
475            txn.cursor().last()
476            for key, value in txn.cursor().iterprev():
477                if key == self.current_counter_state_key:
478                    continue
479
480                if number <= 0:
481                    break
482
483                result.append(self.serializer.loads(value))
484                number -= 1
485        
486        result.reverse()
487        return result
488
489    def inject_handler_to_logger(self, logger_instance: logging.Logger) -> bool:
490        if logger_instance not in self.logger_handlers:
491            logger_handler: LoggingHandler = LoggingHandler(self)
492            self.logger_handlers[logger_instance] = logger_handler
493            logger_instance.addHandler(logger_handler)
494            return True
495        else:
496            return False
497
498    def _on_connect_to_logger(self, logger_instance: logging.Logger):
499        result = self.inject_handler_to_logger(logger_instance)
500        return True, result, None
501
502    def eject_handler_from_logger(self, logger_instance: logging.Logger) -> bool:
503        if logger_instance in self.logger_handlers:
504            logger_instance.removeHandler(self.logger_handlers[logger_instance])
505            del self.logger_handlers[logger_instance]
506            return True
507        else:
508            return False
509
510    def _on_disconnect_from_logger(self, logger_instance: logging.Logger):
511        result = self.eject_handler_from_logger(logger_instance)
512        return True, result, None
513
514
515LogExtended.default_service_type = Log
516LogRequest.default_service_type = Log
517
518
519class LogClient:
520    def __init__(self, info_gatherer: Optional[Callable[[int], Dict[str, Any]]] = None) -> None:
521        self.info_gatherer: Optional[Callable] = info_gatherer
522        self.log_extended_request: LogEx = LogEx(default_info_gatherer, depth=2)
523        self.extended: bool = True
524
525    def log_fast(self, i: Interface, *args, **kwargs) -> Optional[Any]:
526        if self.extended:
527            i(self.log_extended_request(*args, **kwargs))
528        else:
529            i(Log, *args, **kwargs)
530        
531        return args[0] if args else None
532
533    async def alog_fast(self, i: Interface, *args, **kwargs) -> Optional[Any]:
534        if self.extended:
535            await i(self.log_extended_request(*args, **kwargs))
536        else:
537            await i(Log, *args, **kwargs)
538        
539        return args[0] if args else None
540
541    def log(self, *args, **kwargs) -> Optional[Any]:
542        if self.extended:
543            current_interface()(self.log_extended_request(*args, **kwargs))
544        else:
545            current_interface()(Log, *args, **kwargs)
546        
547        return args[0] if args else None
548
549    async def alog(self, *args, **kwargs) -> Optional[Any]:
550        if self.extended:
551            await current_interface()(self.log_extended_request(*args, **kwargs))
552        else:
553            await current_interface()(Log, *args, **kwargs)
554        
555        return args[0] if args else None
556
557    def put_log_fast(self, scheduler: CoroSchedulerType, *args, **kwargs) -> Optional[Any]:
558        if self.extended:
559            log_ex_request: LogExtended = self.log_extended_request(*args, **kwargs)
560            scheduler.get_service_instance_fast(Log).put_log_ex(*log_ex_request.args, **log_ex_request.kwargs)
561        else:
562            scheduler.get_service_instance_fast(Log).put_log(args, kwargs)
563        
564        return args[0] if args else None
565
566    plog_fast = put_log_fast
567
568    # async def aput_log_fast(self, scheduler: CoroSchedulerType, *args, **kwargs):
569    #     if self.extended:
570    #         log_ex_request: LogExtended = self.log_extended_request(*args, **kwargs)
571    #         scheduler.get_service_instance_fast(Log).put_log_ex(*log_ex_request.args, **log_ex_request.kwargs)
572    #     else:
573    #         scheduler.get_service_instance_fast(Log).put_log(args, kwargs)
574
575    # aplog_fast = aput_log_fast
576
577    def put_log(self, *args, **kwargs) -> Optional[Any]:
578        return self.put_log_fast(current_coro_scheduler(), *args, **kwargs)
579
580    plog = put_log
581
582    # async def aput_log(self, *args, **kwargs):
583    #     self.put_log_fast(current_coro_scheduler(), *args, **kwargs)
584
585    # aplog = aput_log
586
587
588default_log_client: LogClient = LogClient(default_info_gatherer)
589log_fast = default_log_client.log_fast
590alog_fast = default_log_client.alog_fast
591log = default_log_client.log
592alog = default_log_client.alog
593put_log_fast = default_log_client.put_log_fast
594plog_fast = default_log_client.plog_fast
595put_log = default_log_client.put_log
596plog = default_log_client.plog
597# aput_log_fast = default_log_client.aput_log_fast
598# aplog_fast = default_log_client.aplog_fast
599# aput_log = default_log_client.aput_log
600# aplog = default_log_client.aplog
601
602
603def view_log(path_to_db_environment: Optional[str]=None, file_to_redirect_output: Optional[str]=None):
604    if path_to_db_environment is None:
605        path_to_db_environment = path_relative_to_current_dir('log.db')
606
607    output_file = None
608    if file_to_redirect_output is not None:
609        output_file = open(file_to_redirect_output, 'wb')
610
611    try:
612        db_environment = lmdb.open(path_to_db_environment, map_size=20 * 1024 ** 2, writemap=True, max_dbs=2)
613        db = db_environment.open_db(b'logs')
614        # serializer = best_serializer_for_standard_data((DataFormats.binary,
615        #                               Tags.can_use_bytes,
616        #                               Tags.decode_str_as_str,
617        #                               Tags.decode_list_as_list,
618        #                               Tags.decode_bytes_as_bytes, 
619        #                               Tags.superficial,
620        #                               Tags.current_platform,
621        #                               Tags.multi_platform),
622        #                              TestDataType.small,
623        #                              0.1)
624        serializer = Serializer(Serializers.msgspec_messagepack)
625        with db_environment.begin() as txn:
626            for key, value in txn.cursor(db=db):
627                if key == f'{str(0).zfill(16)}'.encode():
628                    if output_file is None:
629                        print(f'λλλ <<< {serializer.loads(value)} >>>')
630                        print()
631                    else:
632                        output_file.write(f'λλλ <<< {serializer.loads(value)} >>>'.encode())
633                        output_file.write('\n'.encode())
634                else:
635                    args, kwargs, info = serializer.loads(value)
636                    if output_file is None:
637                        print(f'λ >>>\t{key}: {"~"*8}')
638                        print(*args, **kwargs)
639                        print(info)
640                        print()
641                    else:
642                        output_file.write(f'λ >>>\t{key}: {"~"*8}\n'.encode())
643                        output_file.write(f'{args_kwargs_to_str(args, kwargs)}\n'.encode())
644                        output_file.write('\n'.encode())
645    finally:
646        if output_file is not None:
647            output_file.close()
648
649
650def clear_log(path_to_db_environment: Optional[str]=None):
651    if path_to_db_environment is None:
652        path_to_db_environment = path_relative_to_current_dir('log.db')
653    db_environment = lmdb.open(path_to_db_environment, map_size=20 * 1024 ** 2, writemap=True, max_dbs=2)
654    db = db_environment.open_db(b'logs')
655    def handler():
656        with db_environment.begin(write=True) as txn:
657            txn.drop(db=db, delete=False)
658    lmdb_reapplier(db_environment, db, handler)
659
660
661def lmdb_reapplier(environment: lmdb.Environment, db, handler: Callable):
662    failed = True
663    while failed:
664        need_to_drop = False
665        try:
666            handler()
667            failed = False
668        except lmdb.MapFullError:
669            need_to_drop = True
670        
671        if need_to_drop:
672            environment.set_mapsize(environment.info()['map_size'] + 2 * 1024**2)
673            with environment.begin(write=True) as txn:
674                txn.drop(db=db, delete=False)
675
676
677
678class LoggingHandler(logging.Handler):
679    def __init__(self, log_service: Log, *args, **kwargs):
680        super().__init__(*args, **kwargs)
681        self.log_service: Log = log_service
682        self.cs: CoroSchedulerType = log_service._loop
683
684    def emit(self, record: logging.LogRecord):
685        log_entry = self.format(record)
686        interested_frame = frame(7)  # TODO: mostly correct at least for a Python 3.8.10 (wrong at least for `warn()` and `exception()` logger methods)
687        try:
688            interface: Interface = current_interface()
689        except OutsideCoroSchedulerContext:
690            interface = None
691
692        if interface is None:
693            caller_info = f'FuncName: {record.funcName}'
694            coro_parents_strings = list()
695        else:
696            coro_worker = interface._coro.worker
697            if isinstance(coro_worker, GreenletWorkerWrapper):
698                coro_worker = coro_worker.worker
699            
700            caller_info = f'FuncName: {record.funcName}; CoroID: {interface.coro_id:10}; Type: {"Awaitable" if isinstance(interface._coro, CoroWrapperAsyncAwait) else "Greenlet"}; Worker: {entity_repr_owner_based(coro_worker)}'
701            coro_parents_strings = get_coro_parents_strings(interface.coro_id)
702        self.log_service.put_log_ex((log_entry, ), dict(), {
703            InfoFields.current_time: datetime.fromtimestamp(record.created),
704            InfoFields.perf_counter_time: perf_counter(),
705            InfoFields.file_name: record.filename,
706            InfoFields.line_number: record.lineno,
707            InfoFields.caller_info: caller_info,
708            InfoFields.traceback_strings: format_stack(interested_frame),
709            InfoFields.coro_parents_strings: coro_parents_strings,
710            InfoFields.logging_level: record.levelname,
711        })
class InfoFields(enum.IntEnum):
111class InfoFields(IntEnum):
112    current_time = 0
113    file_name = 1
114    line_number = 2
115    caller_info = 3
116    traceback_strings = 4
117    perf_counter_time = 5
118    coro_parents_strings = 6
119    logging_level = 7

An enumeration.

current_time = <InfoFields.current_time: 0>
file_name = <InfoFields.file_name: 1>
line_number = <InfoFields.line_number: 2>
caller_info = <InfoFields.caller_info: 3>
traceback_strings = <InfoFields.traceback_strings: 4>
perf_counter_time = <InfoFields.perf_counter_time: 5>
coro_parents_strings = <InfoFields.coro_parents_strings: 6>
logging_level = <InfoFields.logging_level: 7>
Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
to_bytes
from_bytes
as_integer_ratio
real
imag
numerator
denominator
def default_info_gatherer(depth: int) -> Dict[str, Any]:
122def default_info_gatherer(depth: int) -> Dict[str, Any]:
123    interested_frame = frame(depth + 1)
124    try:
125        interface: Interface = current_interface()
126    except OutsideCoroSchedulerContext:
127        interface = None
128
129    if interface is None:
130        caller_info = entity_repr_owner_based(interested_frame)
131        coro_parents_strings = list()
132    else:
133        coro_worker = interface._coro.worker
134        if isinstance(coro_worker, GreenletWorkerWrapper):
135            coro_worker = coro_worker.worker
136        
137        caller_info = f'CoroID: {interface.coro_id:10}; Type: {"Awaitable" if isinstance(interface._coro, CoroWrapperAsyncAwait) else "Greenlet"}; Worker: {entity_repr_owner_based(coro_worker)}'
138        coro_parents_strings = get_coro_parents_strings(interface.coro_id)
139    
140    return {
141        InfoFields.current_time: datetime.now(),
142        InfoFields.perf_counter_time: perf_counter(),
143        InfoFields.file_name: interested_frame.f_code.co_filename,
144        InfoFields.line_number: interested_frame.f_lineno,
145        InfoFields.caller_info: caller_info,
146        InfoFields.traceback_strings: format_stack(interested_frame),
147        InfoFields.coro_parents_strings: coro_parents_strings,
148    }
class LogExtended(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.TypedServiceRequest[NoneType]):
151class LogExtended(TypedServiceRequest[None]):
152    default__request__type__ = 4
153
154    def __init__(self, info_gatherer: Optional[Callable[[int], Dict[str, Any]]] = None, depth: Optional[int] = 1):
155        super().__init__()
156        self.info_gatherer: Optional[Callable] = info_gatherer
157        self.depth: Optional[int] = depth
158    
159    def _copy(self) -> 'LogExtended':
160        return LogExtended(self.info_gatherer, self.depth)
161
162    def __call__(self, *args, **kwargs) -> 'LogEx':
163        if self.info_gatherer is None:
164            info = None
165        else:
166            info = self.info_gatherer(self.depth + 1)
167        
168        return self._save_to_copy(self.default__request__type__, args, kwargs, info)

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

LogExtended( info_gatherer: Union[Callable[[int], Dict[str, Any]], NoneType] = None, depth: Union[int, NoneType] = 1)
154    def __init__(self, info_gatherer: Optional[Callable[[int], Dict[str, Any]]] = None, depth: Optional[int] = 1):
155        super().__init__()
156        self.info_gatherer: Optional[Callable] = info_gatherer
157        self.depth: Optional[int] = depth
default__request__type__: int = 4
info_gatherer: Union[Callable, NoneType]
depth: Union[int, NoneType]
default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] = <class 'Log'>
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
request_type
args
kwargs
provide_to_request_handler
interface
i
async_interface
ai
LogEx = <class 'LogExtended'>
class Log(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.TypedService[NoneType], cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin):
197class Log(TypedService[None], EntityStatsMixin):
198    def __init__(self, loop: CoroSchedulerType):
199        super(Log, self).__init__(loop)
200        self.default_logs_dir: str = 'log.db'
201        self.path_to_db_environment = None
202        self.app_name_waiter: CoroWrapperBase = None
203        self.root_path_to_log_environment_rel: RelativePath = None
204        self.db_environment: lmdb.Environment = None
205        self.db = None
206        self.log_queue: List[Tuple[Tuple, Dict, Dict[str, Any]]] = list()
207        self.async_loop = None
208        self.log_counter = Counter()
209        self.sync_time_interval = 0.5
210        self.characters_in_counter = 16
211        self.current_counter_state_key = f'{str(0).zfill(self.characters_in_counter)}'.encode()
212        self.last_sync_time = perf_counter()
213        self.force_sync = False
214        self.write_locked = False
215        self.write_locked_coro_id: Optional[CoroID] = None
216        self.periodic_sync_started: bool = False
217        self.iteration_handlers: List[Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]] = list()
218        self.new_iteration_handlers_num: int = 0
219        self.logger_handlers: Dict[logging.Logger, LoggingHandler] = dict()
220        self.logger: logging.Logger = self._loop.logger
221        # self.serializer = best_serializer_for_standard_data((DataFormats.binary,
222        #                                    Tags.can_use_bytes,
223        #                                    Tags.decode_str_as_str,
224        #                                    Tags.decode_list_as_list,
225        #                                    Tags.decode_bytes_as_bytes,
226        #                                    Tags.superficial,
227        #                                    Tags.current_platform,
228        #                                    Tags.multi_platform),
229        #                                   TestDataType.small,
230        #                                   0.1)
231        self.serializer = Serializer(Serializers.msgspec_messagepack)
232
233        self._request_workers = {
234            0: self._on_set_db_environment_path,
235            1: self._on_sync,
236            2: self._on_add_iteration_handler,
237            3: self._on_remove_iteration_handler,
238            4: self._on_log_extended,
239            5: self._on_connect_to_logger,
240            6: self._on_disconnect_from_logger,
241        }
242        self.inject_handler_to_logger(self._loop.logger)
243
244    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
245        return type(self).__name__, {
246            'log counter': self.log_counter._index,
247            'current counter state key': self.current_counter_state_key,
248        }
249    
250    def put_log(self, args, kwargs):
251        self.log_queue.append((args, kwargs, None))
252        # self.make_live
253    
254    def put_log_ex(self, args, kwargs, info):
255        self.log_queue.append((args, kwargs, info))
256
257    def destroy(self):
258        # TODO: we need to use some loop destroy service in order to put our coro which will write all pending queues,
259        # sync envirounments and close them. Also we need to prevent new requests from being processed. (see DB service)
260        loggers_instancess = list(self.logger_handlers.keys())
261        for logger_instance in loggers_instancess:
262            self.eject_handler_from_logger(logger_instance)
263
264        if self.db_environment is not None:
265            self.db_environment.close()
266
267    def single_task_registration_or_immediate_processing(
268            self, *args, **kwargs) -> Tuple[bool, None, None]:
269        result = self.try_resolve_request(*args, **kwargs)
270        if result is None:
271            coro_info = self.current_caller_coro_info
272            coro_worker = coro_info.coro.worker
273            if isinstance(coro_worker, GreenletWorkerWrapper):
274                coro_worker = coro_worker.worker
275            
276            caller_info = f'CoroID: {coro_info.coro_id:10}; Type: {"Awaitable" if issubclass(coro_info.coro_type, CoroWrapperAsyncAwait) else "Greenlet"}; Worker: {entity_repr_owner_based(coro_worker)}'
277            coro_worker_code = get_code(coro_worker)
278            info = {
279                InfoFields.current_time: datetime.now(),
280                InfoFields.perf_counter_time: perf_counter(),
281                InfoFields.file_name: coro_worker_code.co_filename,
282                InfoFields.line_number: coro_worker_code.co_firstlineno,
283                InfoFields.caller_info: caller_info,
284                InfoFields.traceback_strings: list(),
285                # InfoFields.coro_parents_strings: get_coro_parents_strings(coro_info.coro_id),
286                InfoFields.coro_parents_strings: list(),
287            }
288            self.log_queue.append((args, kwargs, info))
289            # self.make_live()
290
291            # TODO: we need to implement backpressure mechanism here. If we have too many pending requests, we need to put request to queue instead of responding immediately.
292            # However this will not be enough for a direct requests. We need to implement some kind of backpressure mechanism for direct requests too.
293            return True, None, None
294        else:
295            return result
296
297    def _ensure_default_db_environment(self) -> bool:
298        if self.db_environment is None:
299            if self.path_to_db_environment is None:
300                if self.app_name_waiter is None:
301                    async def coro(i: Interface, self: 'Log'):
302                        app_name_for_fs = await i(InstanceRequest().wait('app_name_for_fs'))
303                        app_data_dir_path_type: AppDirectoryType = await i(InstanceRequest().wait('app_data_dir_path_type'))
304                        app_dir_path: AppDirPath = await i(InstanceRequest().wait(AppDirPath))
305                        app_data_dir_path: str = app_dir_path.cached(app_data_dir_path_type, app_name_for_fs)
306                        self.path_to_db_environment = RelativePath(app_data_dir_path)(self.default_logs_dir)
307                        self._init_db()
308                        self.app_name_waiter = None
309                        self.make_live()
310                    
311                    self.app_name_waiter = put_root_from_other_service(self, coro, self)
312                
313                self.make_dead()
314                return False
315            else:
316                self._init_db()
317                return True
318        else:
319            return True
320
321    def full_processing_iteration(self):
322        if not self._ensure_default_db_environment():
323            return
324        
325        self.force_sync = False
326        log_queue_buff = self.log_queue
327        self.log_queue = type(log_queue_buff)()
328        current_time = perf_counter()
329        current_time_str = str(current_time)
330        for iteration_handler in self.iteration_handlers:
331            iteration_handler(self, log_queue_buff, current_time, current_time_str)
332        
333        self.new_iteration_handlers_num = 0
334
335        def handler():
336            with self.db_environment.begin(write=True) as txn:
337                for log_info in log_queue_buff:
338                    key = f'{str(self.log_counter.get()).zfill(self.characters_in_counter)}__{current_time_str}'.encode()
339                    value = self.serializer.dumps(log_info)
340                    txn.put(key, value, db=self.db, dupdata=True, append=True)
341                
342                txn.put(self.current_counter_state_key, self.serializer.dumps(self.log_counter._index), db=self.db)
343        
344        lmdb_reapplier(self.db_environment, self.db, handler)
345        
346        self.sync_in_thread_pool()
347        
348        self.last_sync_time = perf_counter()
349
350        self.make_dead()
351
352    def in_work(self) -> bool:
353        result: bool = bool(self.log_queue) \
354            or (self.db_environment is None) \
355            or (not self.periodic_sync_started) \
356            or self.new_iteration_handlers_num \
357            or (self.force_sync or ((not self.write_locked) and bool(self.log_queue) and ((perf_counter() - self.last_sync_time) >= self.sync_time_interval)))
358        return self.thrifty_in_work(result)
359    
360    def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]:
361        time_since_last_sync_time: float = perf_counter() - self.last_sync_time
362        if self.sync_time_interval > time_since_last_sync_time:
363            return True, self.sync_time_interval - time_since_last_sync_time
364        else:
365            return True, 0
366
367    def _init_db(self):
368        self.logger.info(f'Path to Log DB Env: {self.path_to_db_environment}')
369        self.db_environment = lmdb.open(self.path_to_db_environment, map_size=20 * 1024**2, writemap=True, max_dbs=2,
370                                        map_async=True, lock=False, metasync=False, sync=False, meminit=False)
371        self.db = self.db_environment.open_db(b'logs')
372        def handler():
373            with self.db_environment.begin(write=True) as txn:
374                current_counter_state = txn.get(self.current_counter_state_key, db=self.db)
375                if current_counter_state is None:
376                    txn.put(self.current_counter_state_key, self.serializer.dumps(self.log_counter._index), db=self.db)
377                else:
378                    self.log_counter._index = self.serializer.loads(current_counter_state)
379        
380        lmdb_reapplier(self.db_environment, self.db, handler)
381        self.db_environment.sync(True)
382
383    def _on_set_db_environment_path(self, path_to_db_environment: str):
384        if self.write_locked:
385            return True, False, None
386        
387        if self.db_environment is None:
388            self.path_to_db_environment = path_to_db_environment
389            try:
390                self._init_db()
391            except:
392                exception = get_exception()
393                return True, False, exception
394            return True, True, None
395        else:
396            return True, False, None
397    
398    def _on_sync(self):
399        if self.db_environment is None:
400            self.make_live()
401        else:
402            if self.log_queue:
403                self.force_sync = True
404                self.make_live()
405            else:
406                # self.db_environment.sync(True)
407                self.sync_in_thread_pool()
408        
409        return True, None, None
410
411    def _on_log_extended(self, args, kwargs, info):
412        self.log_queue.append((args, kwargs, info))
413        return True, None, None
414
415    def _on_add_iteration_handler(self, handler: Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]):
416        self.iteration_handlers.append(handler)
417        self.new_iteration_handlers_num += 1
418        return True, None, None
419    
420    def _on_remove_iteration_handler(self, handler: Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]):
421        removed = False
422        try:
423            self.iteration_handlers.remove(handler)
424            removed = True
425        except ValueError:
426            pass
427
428        return True, removed, None
429
430    def sync_in_thread_pool(self):
431        async def sync_db_coro(i: Interface, self: 'Log', asyncio_loop, need_to_ensure_asyncio_loop: bool):
432            if need_to_ensure_asyncio_loop:
433                asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None, CoroPriority.low, True))
434            else:
435                if asyncio_loop is None:
436                    asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().get())
437            
438            async def sync_db(self, asyncio_loop):
439                def sync_worker():
440                    self.db_environment.sync(True)
441                
442                await task_in_thread_pool(asyncio_loop, sync_worker)
443
444            await i(AsyncioLoop, AsyncioLoopRequest().wait(sync_db(self, asyncio_loop)))
445            self.write_locked_coro_id = None
446            self.write_locked = False
447            def make_service_live_for_a_next_sync(self: 'Log'):
448                self.periodic_sync_started = False
449                self.make_live()
450            
451            await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self)
452
453        asyncio_loop = None
454        need_to_ensure_asyncio_loop = False
455        try:
456            asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get()
457        except AsyncioLoopWasNotSetError:
458            need_to_ensure_asyncio_loop = True
459
460        coro: CoroWrapperBase = self._loop.put_coro(sync_db_coro, self, asyncio_loop, need_to_ensure_asyncio_loop)
461        self.write_locked = True
462        self.write_locked_coro_id = coro.coro_id
463        self.periodic_sync_started = True
464    
465    def get_last_n_logs(self, number: Union[None, int]=None) -> List[Tuple[Tuple, Dict, Dict[str, Any]]]:
466        if self.db_environment is None:
467            return list()
468        
469        if number is None:
470            number = self.log_counter._index + 1
471        elif number <= 0:
472            return list()
473        
474        result = list()
475        with self.db_environment.begin(db=self.db) as txn:
476            txn.cursor().last()
477            for key, value in txn.cursor().iterprev():
478                if key == self.current_counter_state_key:
479                    continue
480
481                if number <= 0:
482                    break
483
484                result.append(self.serializer.loads(value))
485                number -= 1
486        
487        result.reverse()
488        return result
489
490    def inject_handler_to_logger(self, logger_instance: logging.Logger) -> bool:
491        if logger_instance not in self.logger_handlers:
492            logger_handler: LoggingHandler = LoggingHandler(self)
493            self.logger_handlers[logger_instance] = logger_handler
494            logger_instance.addHandler(logger_handler)
495            return True
496        else:
497            return False
498
499    def _on_connect_to_logger(self, logger_instance: logging.Logger):
500        result = self.inject_handler_to_logger(logger_instance)
501        return True, result, None
502
503    def eject_handler_from_logger(self, logger_instance: logging.Logger) -> bool:
504        if logger_instance in self.logger_handlers:
505            logger_instance.removeHandler(self.logger_handlers[logger_instance])
506            del self.logger_handlers[logger_instance]
507            return True
508        else:
509            return False
510
511    def _on_disconnect_from_logger(self, logger_instance: logging.Logger):
512        result = self.eject_handler_from_logger(logger_instance)
513        return True, result, None

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

Log( loop: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
198    def __init__(self, loop: CoroSchedulerType):
199        super(Log, self).__init__(loop)
200        self.default_logs_dir: str = 'log.db'
201        self.path_to_db_environment = None
202        self.app_name_waiter: CoroWrapperBase = None
203        self.root_path_to_log_environment_rel: RelativePath = None
204        self.db_environment: lmdb.Environment = None
205        self.db = None
206        self.log_queue: List[Tuple[Tuple, Dict, Dict[str, Any]]] = list()
207        self.async_loop = None
208        self.log_counter = Counter()
209        self.sync_time_interval = 0.5
210        self.characters_in_counter = 16
211        self.current_counter_state_key = f'{str(0).zfill(self.characters_in_counter)}'.encode()
212        self.last_sync_time = perf_counter()
213        self.force_sync = False
214        self.write_locked = False
215        self.write_locked_coro_id: Optional[CoroID] = None
216        self.periodic_sync_started: bool = False
217        self.iteration_handlers: List[Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]] = list()
218        self.new_iteration_handlers_num: int = 0
219        self.logger_handlers: Dict[logging.Logger, LoggingHandler] = dict()
220        self.logger: logging.Logger = self._loop.logger
221        # self.serializer = best_serializer_for_standard_data((DataFormats.binary,
222        #                                    Tags.can_use_bytes,
223        #                                    Tags.decode_str_as_str,
224        #                                    Tags.decode_list_as_list,
225        #                                    Tags.decode_bytes_as_bytes,
226        #                                    Tags.superficial,
227        #                                    Tags.current_platform,
228        #                                    Tags.multi_platform),
229        #                                   TestDataType.small,
230        #                                   0.1)
231        self.serializer = Serializer(Serializers.msgspec_messagepack)
232
233        self._request_workers = {
234            0: self._on_set_db_environment_path,
235            1: self._on_sync,
236            2: self._on_add_iteration_handler,
237            3: self._on_remove_iteration_handler,
238            4: self._on_log_extended,
239            5: self._on_connect_to_logger,
240            6: self._on_disconnect_from_logger,
241        }
242        self.inject_handler_to_logger(self._loop.logger)
default_logs_dir: str
path_to_db_environment
app_name_waiter: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroWrapperBase
root_path_to_log_environment_rel: cengal.file_system.path_manager.versions.v_0.path_manager.RelativePath
db_environment: Environment
db
log_queue: List[Tuple[Tuple, Dict, Dict[str, Any]]]
async_loop
log_counter
sync_time_interval
characters_in_counter
current_counter_state_key
last_sync_time
force_sync
write_locked
write_locked_coro_id: Union[int, NoneType]
periodic_sync_started: bool
iteration_handlers: list[collections.abc.Callable[Log, typing.List[typing.Tuple[typing.Tuple, typing.Dict]], float, str, NoneType]]
new_iteration_handlers_num: int
logger_handlers: Dict[logging.Logger, cengal.parallel_execution.coroutines.coro_standard_services.log.versions.v_0.log.LoggingHandler]
logger: logging.Logger
serializer
def put_log(self, args, kwargs):
250    def put_log(self, args, kwargs):
251        self.log_queue.append((args, kwargs, None))
252        # self.make_live
def put_log_ex(self, args, kwargs, info):
254    def put_log_ex(self, args, kwargs, info):
255        self.log_queue.append((args, kwargs, info))
def destroy(self):
257    def destroy(self):
258        # TODO: we need to use some loop destroy service in order to put our coro which will write all pending queues,
259        # sync envirounments and close them. Also we need to prevent new requests from being processed. (see DB service)
260        loggers_instancess = list(self.logger_handlers.keys())
261        for logger_instance in loggers_instancess:
262            self.eject_handler_from_logger(logger_instance)
263
264        if self.db_environment is not None:
265            self.db_environment.close()
def single_task_registration_or_immediate_processing(self, *args, **kwargs) -> Tuple[bool, NoneType, NoneType]:
267    def single_task_registration_or_immediate_processing(
268            self, *args, **kwargs) -> Tuple[bool, None, None]:
269        result = self.try_resolve_request(*args, **kwargs)
270        if result is None:
271            coro_info = self.current_caller_coro_info
272            coro_worker = coro_info.coro.worker
273            if isinstance(coro_worker, GreenletWorkerWrapper):
274                coro_worker = coro_worker.worker
275            
276            caller_info = f'CoroID: {coro_info.coro_id:10}; Type: {"Awaitable" if issubclass(coro_info.coro_type, CoroWrapperAsyncAwait) else "Greenlet"}; Worker: {entity_repr_owner_based(coro_worker)}'
277            coro_worker_code = get_code(coro_worker)
278            info = {
279                InfoFields.current_time: datetime.now(),
280                InfoFields.perf_counter_time: perf_counter(),
281                InfoFields.file_name: coro_worker_code.co_filename,
282                InfoFields.line_number: coro_worker_code.co_firstlineno,
283                InfoFields.caller_info: caller_info,
284                InfoFields.traceback_strings: list(),
285                # InfoFields.coro_parents_strings: get_coro_parents_strings(coro_info.coro_id),
286                InfoFields.coro_parents_strings: list(),
287            }
288            self.log_queue.append((args, kwargs, info))
289            # self.make_live()
290
291            # TODO: we need to implement backpressure mechanism here. If we have too many pending requests, we need to put request to queue instead of responding immediately.
292            # However this will not be enough for a direct requests. We need to implement some kind of backpressure mechanism for direct requests too.
293            return True, None, None
294        else:
295            return result
def full_processing_iteration(self):
321    def full_processing_iteration(self):
322        if not self._ensure_default_db_environment():
323            return
324        
325        self.force_sync = False
326        log_queue_buff = self.log_queue
327        self.log_queue = type(log_queue_buff)()
328        current_time = perf_counter()
329        current_time_str = str(current_time)
330        for iteration_handler in self.iteration_handlers:
331            iteration_handler(self, log_queue_buff, current_time, current_time_str)
332        
333        self.new_iteration_handlers_num = 0
334
335        def handler():
336            with self.db_environment.begin(write=True) as txn:
337                for log_info in log_queue_buff:
338                    key = f'{str(self.log_counter.get()).zfill(self.characters_in_counter)}__{current_time_str}'.encode()
339                    value = self.serializer.dumps(log_info)
340                    txn.put(key, value, db=self.db, dupdata=True, append=True)
341                
342                txn.put(self.current_counter_state_key, self.serializer.dumps(self.log_counter._index), db=self.db)
343        
344        lmdb_reapplier(self.db_environment, self.db, handler)
345        
346        self.sync_in_thread_pool()
347        
348        self.last_sync_time = perf_counter()
349
350        self.make_dead()
def in_work(self) -> bool:
352    def in_work(self) -> bool:
353        result: bool = bool(self.log_queue) \
354            or (self.db_environment is None) \
355            or (not self.periodic_sync_started) \
356            or self.new_iteration_handlers_num \
357            or (self.force_sync or ((not self.write_locked) and bool(self.log_queue) and ((perf_counter() - self.last_sync_time) >= self.sync_time_interval)))
358        return self.thrifty_in_work(result)

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

def time_left_before_next_event(self) -> Tuple[bool, Union[int, float, NoneType]]:
360    def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]:
361        time_since_last_sync_time: float = perf_counter() - self.last_sync_time
362        if self.sync_time_interval > time_since_last_sync_time:
363            return True, self.sync_time_interval - time_since_last_sync_time
364        else:
365            return True, 0
def sync_in_thread_pool(self):
430    def sync_in_thread_pool(self):
431        async def sync_db_coro(i: Interface, self: 'Log', asyncio_loop, need_to_ensure_asyncio_loop: bool):
432            if need_to_ensure_asyncio_loop:
433                asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None, CoroPriority.low, True))
434            else:
435                if asyncio_loop is None:
436                    asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().get())
437            
438            async def sync_db(self, asyncio_loop):
439                def sync_worker():
440                    self.db_environment.sync(True)
441                
442                await task_in_thread_pool(asyncio_loop, sync_worker)
443
444            await i(AsyncioLoop, AsyncioLoopRequest().wait(sync_db(self, asyncio_loop)))
445            self.write_locked_coro_id = None
446            self.write_locked = False
447            def make_service_live_for_a_next_sync(self: 'Log'):
448                self.periodic_sync_started = False
449                self.make_live()
450            
451            await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self)
452
453        asyncio_loop = None
454        need_to_ensure_asyncio_loop = False
455        try:
456            asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get()
457        except AsyncioLoopWasNotSetError:
458            need_to_ensure_asyncio_loop = True
459
460        coro: CoroWrapperBase = self._loop.put_coro(sync_db_coro, self, asyncio_loop, need_to_ensure_asyncio_loop)
461        self.write_locked = True
462        self.write_locked_coro_id = coro.coro_id
463        self.periodic_sync_started = True
def get_last_n_logs( self, number: Union[NoneType, int] = None) -> List[Tuple[Tuple, Dict, Dict[str, Any]]]:
465    def get_last_n_logs(self, number: Union[None, int]=None) -> List[Tuple[Tuple, Dict, Dict[str, Any]]]:
466        if self.db_environment is None:
467            return list()
468        
469        if number is None:
470            number = self.log_counter._index + 1
471        elif number <= 0:
472            return list()
473        
474        result = list()
475        with self.db_environment.begin(db=self.db) as txn:
476            txn.cursor().last()
477            for key, value in txn.cursor().iterprev():
478                if key == self.current_counter_state_key:
479                    continue
480
481                if number <= 0:
482                    break
483
484                result.append(self.serializer.loads(value))
485                number -= 1
486        
487        result.reverse()
488        return result
def inject_handler_to_logger(self, logger_instance: logging.Logger) -> bool:
490    def inject_handler_to_logger(self, logger_instance: logging.Logger) -> bool:
491        if logger_instance not in self.logger_handlers:
492            logger_handler: LoggingHandler = LoggingHandler(self)
493            self.logger_handlers[logger_instance] = logger_handler
494            logger_instance.addHandler(logger_handler)
495            return True
496        else:
497            return False
def eject_handler_from_logger(self, logger_instance: logging.Logger) -> bool:
503    def eject_handler_from_logger(self, logger_instance: logging.Logger) -> bool:
504        if logger_instance in self.logger_handlers:
505            logger_instance.removeHandler(self.logger_handlers[logger_instance])
506            del self.logger_handlers[logger_instance]
507            return True
508        else:
509            return False
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
StatsLevel
get_entity_stats
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
in_forground_work
thrifty_in_work
is_low_latency
make_live
make_dead
service_id_impl
service_id
class LogRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
174class LogRequest(ServiceRequest):
175    def set_db_environment_path(self, path_to_db_environment: str) -> ServiceRequest:
176        return self._save(0, path_to_db_environment)
177    
178    def sync(self) -> ServiceRequest:
179        return self._save(1)
180
181    def add_iteration_handler(self, handler: Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]) -> ServiceRequest:
182        return self._save(2, handler)
183
184    def remove_iteration_handler(self, handler: Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]) -> ServiceRequest:
185        return self._save(3, handler)
186    
187    def log(self, *args, **kwargs) -> ServiceRequest:
188        return LogEx[None](default_info_gatherer, depth=2)(*args, **kwargs)
189
190    def connect_to_logger(self, logger_instance: logging.Logger) -> ServiceRequest:
191        return self._save(5, logger_instance)
192
193    def disconnect_from_logger(self, logger_instance: logging.Logger) -> ServiceRequest:
194        return self._save(6, logger_instance)
def set_db_environment_path( self, path_to_db_environment: str) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
175    def set_db_environment_path(self, path_to_db_environment: str) -> ServiceRequest:
176        return self._save(0, path_to_db_environment)
def sync( self) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
178    def sync(self) -> ServiceRequest:
179        return self._save(1)
def add_iteration_handler( self, handler: collections.abc.Callable[Log, typing.List[typing.Tuple[typing.Tuple, typing.Dict]], float, str, NoneType]) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
181    def add_iteration_handler(self, handler: Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]) -> ServiceRequest:
182        return self._save(2, handler)
def remove_iteration_handler( self, handler: collections.abc.Callable[Log, typing.List[typing.Tuple[typing.Tuple, typing.Dict]], float, str, NoneType]) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
184    def remove_iteration_handler(self, handler: Callable[['Log', List[Tuple[Tuple, Dict]], float, str], None]) -> ServiceRequest:
185        return self._save(3, handler)
def log( self, *args, **kwargs) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
187    def log(self, *args, **kwargs) -> ServiceRequest:
188        return LogEx[None](default_info_gatherer, depth=2)(*args, **kwargs)
def connect_to_logger( self, logger_instance: logging.Logger) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
190    def connect_to_logger(self, logger_instance: logging.Logger) -> ServiceRequest:
191        return self._save(5, logger_instance)
def disconnect_from_logger( self, logger_instance: logging.Logger) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
193    def disconnect_from_logger(self, logger_instance: logging.Logger) -> ServiceRequest:
194        return self._save(6, logger_instance)
default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] = <class 'Log'>
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
default__request__type__
request_type
args
kwargs
provide_to_request_handler
interface
i
async_interface
ai
def view_log( path_to_db_environment: Union[str, NoneType] = None, file_to_redirect_output: Union[str, NoneType] = None):
604def view_log(path_to_db_environment: Optional[str]=None, file_to_redirect_output: Optional[str]=None):
605    if path_to_db_environment is None:
606        path_to_db_environment = path_relative_to_current_dir('log.db')
607
608    output_file = None
609    if file_to_redirect_output is not None:
610        output_file = open(file_to_redirect_output, 'wb')
611
612    try:
613        db_environment = lmdb.open(path_to_db_environment, map_size=20 * 1024 ** 2, writemap=True, max_dbs=2)
614        db = db_environment.open_db(b'logs')
615        # serializer = best_serializer_for_standard_data((DataFormats.binary,
616        #                               Tags.can_use_bytes,
617        #                               Tags.decode_str_as_str,
618        #                               Tags.decode_list_as_list,
619        #                               Tags.decode_bytes_as_bytes, 
620        #                               Tags.superficial,
621        #                               Tags.current_platform,
622        #                               Tags.multi_platform),
623        #                              TestDataType.small,
624        #                              0.1)
625        serializer = Serializer(Serializers.msgspec_messagepack)
626        with db_environment.begin() as txn:
627            for key, value in txn.cursor(db=db):
628                if key == f'{str(0).zfill(16)}'.encode():
629                    if output_file is None:
630                        print(f'λλλ <<< {serializer.loads(value)} >>>')
631                        print()
632                    else:
633                        output_file.write(f'λλλ <<< {serializer.loads(value)} >>>'.encode())
634                        output_file.write('\n'.encode())
635                else:
636                    args, kwargs, info = serializer.loads(value)
637                    if output_file is None:
638                        print(f'λ >>>\t{key}: {"~"*8}')
639                        print(*args, **kwargs)
640                        print(info)
641                        print()
642                    else:
643                        output_file.write(f'λ >>>\t{key}: {"~"*8}\n'.encode())
644                        output_file.write(f'{args_kwargs_to_str(args, kwargs)}\n'.encode())
645                        output_file.write('\n'.encode())
646    finally:
647        if output_file is not None:
648            output_file.close()
def clear_log(path_to_db_environment: Union[str, NoneType] = None):
651def clear_log(path_to_db_environment: Optional[str]=None):
652    if path_to_db_environment is None:
653        path_to_db_environment = path_relative_to_current_dir('log.db')
654    db_environment = lmdb.open(path_to_db_environment, map_size=20 * 1024 ** 2, writemap=True, max_dbs=2)
655    db = db_environment.open_db(b'logs')
656    def handler():
657        with db_environment.begin(write=True) as txn:
658            txn.drop(db=db, delete=False)
659    lmdb_reapplier(db_environment, db, handler)
class LogClient:
520class LogClient:
521    def __init__(self, info_gatherer: Optional[Callable[[int], Dict[str, Any]]] = None) -> None:
522        self.info_gatherer: Optional[Callable] = info_gatherer
523        self.log_extended_request: LogEx = LogEx(default_info_gatherer, depth=2)
524        self.extended: bool = True
525
526    def log_fast(self, i: Interface, *args, **kwargs) -> Optional[Any]:
527        if self.extended:
528            i(self.log_extended_request(*args, **kwargs))
529        else:
530            i(Log, *args, **kwargs)
531        
532        return args[0] if args else None
533
534    async def alog_fast(self, i: Interface, *args, **kwargs) -> Optional[Any]:
535        if self.extended:
536            await i(self.log_extended_request(*args, **kwargs))
537        else:
538            await i(Log, *args, **kwargs)
539        
540        return args[0] if args else None
541
542    def log(self, *args, **kwargs) -> Optional[Any]:
543        if self.extended:
544            current_interface()(self.log_extended_request(*args, **kwargs))
545        else:
546            current_interface()(Log, *args, **kwargs)
547        
548        return args[0] if args else None
549
550    async def alog(self, *args, **kwargs) -> Optional[Any]:
551        if self.extended:
552            await current_interface()(self.log_extended_request(*args, **kwargs))
553        else:
554            await current_interface()(Log, *args, **kwargs)
555        
556        return args[0] if args else None
557
558    def put_log_fast(self, scheduler: CoroSchedulerType, *args, **kwargs) -> Optional[Any]:
559        if self.extended:
560            log_ex_request: LogExtended = self.log_extended_request(*args, **kwargs)
561            scheduler.get_service_instance_fast(Log).put_log_ex(*log_ex_request.args, **log_ex_request.kwargs)
562        else:
563            scheduler.get_service_instance_fast(Log).put_log(args, kwargs)
564        
565        return args[0] if args else None
566
567    plog_fast = put_log_fast
568
569    # async def aput_log_fast(self, scheduler: CoroSchedulerType, *args, **kwargs):
570    #     if self.extended:
571    #         log_ex_request: LogExtended = self.log_extended_request(*args, **kwargs)
572    #         scheduler.get_service_instance_fast(Log).put_log_ex(*log_ex_request.args, **log_ex_request.kwargs)
573    #     else:
574    #         scheduler.get_service_instance_fast(Log).put_log(args, kwargs)
575
576    # aplog_fast = aput_log_fast
577
578    def put_log(self, *args, **kwargs) -> Optional[Any]:
579        return self.put_log_fast(current_coro_scheduler(), *args, **kwargs)
580
581    plog = put_log
582
583    # async def aput_log(self, *args, **kwargs):
584    #     self.put_log_fast(current_coro_scheduler(), *args, **kwargs)
585
586    # aplog = aput_log
LogClient( info_gatherer: Union[Callable[[int], Dict[str, Any]], NoneType] = None)
521    def __init__(self, info_gatherer: Optional[Callable[[int], Dict[str, Any]]] = None) -> None:
522        self.info_gatherer: Optional[Callable] = info_gatherer
523        self.log_extended_request: LogEx = LogEx(default_info_gatherer, depth=2)
524        self.extended: bool = True
info_gatherer: Union[Callable, NoneType]
log_extended_request: LogExtended
extended: bool
def log_fast( self, i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, *args, **kwargs) -> Union[Any, NoneType]:
526    def log_fast(self, i: Interface, *args, **kwargs) -> Optional[Any]:
527        if self.extended:
528            i(self.log_extended_request(*args, **kwargs))
529        else:
530            i(Log, *args, **kwargs)
531        
532        return args[0] if args else None
async def alog_fast( self, i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, *args, **kwargs) -> Union[Any, NoneType]:
534    async def alog_fast(self, i: Interface, *args, **kwargs) -> Optional[Any]:
535        if self.extended:
536            await i(self.log_extended_request(*args, **kwargs))
537        else:
538            await i(Log, *args, **kwargs)
539        
540        return args[0] if args else None
def log(self, *args, **kwargs) -> Union[Any, NoneType]:
542    def log(self, *args, **kwargs) -> Optional[Any]:
543        if self.extended:
544            current_interface()(self.log_extended_request(*args, **kwargs))
545        else:
546            current_interface()(Log, *args, **kwargs)
547        
548        return args[0] if args else None
async def alog(self, *args, **kwargs) -> Union[Any, NoneType]:
550    async def alog(self, *args, **kwargs) -> Optional[Any]:
551        if self.extended:
552            await current_interface()(self.log_extended_request(*args, **kwargs))
553        else:
554            await current_interface()(Log, *args, **kwargs)
555        
556        return args[0] if args else None
def put_log_fast( self, scheduler: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable], *args, **kwargs) -> Union[Any, NoneType]:
558    def put_log_fast(self, scheduler: CoroSchedulerType, *args, **kwargs) -> Optional[Any]:
559        if self.extended:
560            log_ex_request: LogExtended = self.log_extended_request(*args, **kwargs)
561            scheduler.get_service_instance_fast(Log).put_log_ex(*log_ex_request.args, **log_ex_request.kwargs)
562        else:
563            scheduler.get_service_instance_fast(Log).put_log(args, kwargs)
564        
565        return args[0] if args else None
def plog_fast( self, scheduler: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable], *args, **kwargs) -> Union[Any, NoneType]:
558    def put_log_fast(self, scheduler: CoroSchedulerType, *args, **kwargs) -> Optional[Any]:
559        if self.extended:
560            log_ex_request: LogExtended = self.log_extended_request(*args, **kwargs)
561            scheduler.get_service_instance_fast(Log).put_log_ex(*log_ex_request.args, **log_ex_request.kwargs)
562        else:
563            scheduler.get_service_instance_fast(Log).put_log(args, kwargs)
564        
565        return args[0] if args else None
def put_log(self, *args, **kwargs) -> Union[Any, NoneType]:
578    def put_log(self, *args, **kwargs) -> Optional[Any]:
579        return self.put_log_fast(current_coro_scheduler(), *args, **kwargs)
def plog(self, *args, **kwargs) -> Union[Any, NoneType]:
578    def put_log(self, *args, **kwargs) -> Optional[Any]:
579        return self.put_log_fast(current_coro_scheduler(), *args, **kwargs)
default_log_client: LogClient = <LogClient object>
def log_fast( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, *args, **kwargs) -> Union[Any, NoneType]:
526    def log_fast(self, i: Interface, *args, **kwargs) -> Optional[Any]:
527        if self.extended:
528            i(self.log_extended_request(*args, **kwargs))
529        else:
530            i(Log, *args, **kwargs)
531        
532        return args[0] if args else None
async def alog_fast( i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, *args, **kwargs) -> Union[Any, NoneType]:
534    async def alog_fast(self, i: Interface, *args, **kwargs) -> Optional[Any]:
535        if self.extended:
536            await i(self.log_extended_request(*args, **kwargs))
537        else:
538            await i(Log, *args, **kwargs)
539        
540        return args[0] if args else None
def log(*args, **kwargs) -> Union[Any, NoneType]:
542    def log(self, *args, **kwargs) -> Optional[Any]:
543        if self.extended:
544            current_interface()(self.log_extended_request(*args, **kwargs))
545        else:
546            current_interface()(Log, *args, **kwargs)
547        
548        return args[0] if args else None
async def alog(*args, **kwargs) -> Union[Any, NoneType]:
550    async def alog(self, *args, **kwargs) -> Optional[Any]:
551        if self.extended:
552            await current_interface()(self.log_extended_request(*args, **kwargs))
553        else:
554            await current_interface()(Log, *args, **kwargs)
555        
556        return args[0] if args else None
def put_log_fast( scheduler: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable], *args, **kwargs) -> Union[Any, NoneType]:
558    def put_log_fast(self, scheduler: CoroSchedulerType, *args, **kwargs) -> Optional[Any]:
559        if self.extended:
560            log_ex_request: LogExtended = self.log_extended_request(*args, **kwargs)
561            scheduler.get_service_instance_fast(Log).put_log_ex(*log_ex_request.args, **log_ex_request.kwargs)
562        else:
563            scheduler.get_service_instance_fast(Log).put_log(args, kwargs)
564        
565        return args[0] if args else None
def plog_fast( scheduler: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable], *args, **kwargs) -> Union[Any, NoneType]:
558    def put_log_fast(self, scheduler: CoroSchedulerType, *args, **kwargs) -> Optional[Any]:
559        if self.extended:
560            log_ex_request: LogExtended = self.log_extended_request(*args, **kwargs)
561            scheduler.get_service_instance_fast(Log).put_log_ex(*log_ex_request.args, **log_ex_request.kwargs)
562        else:
563            scheduler.get_service_instance_fast(Log).put_log(args, kwargs)
564        
565        return args[0] if args else None
def put_log(*args, **kwargs) -> Union[Any, NoneType]:
578    def put_log(self, *args, **kwargs) -> Optional[Any]:
579        return self.put_log_fast(current_coro_scheduler(), *args, **kwargs)
def plog(*args, **kwargs) -> Union[Any, NoneType]:
578    def put_log(self, *args, **kwargs) -> Optional[Any]:
579        return self.put_log_fast(current_coro_scheduler(), *args, **kwargs)