cengal.parallel_execution.coroutines.coro_standard_services.lazy_print.versions.v_0.lazy_print
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19__all__ = ['LazyPrint', 'lazy_print', 'lprint', 'lp'] 20 21 22""" 23Module Docstring 24Docstrings: http://www.python.org/dev/peps/pep-0257/ 25""" 26 27 28from time import perf_counter 29from typing import Dict, List, Tuple 30 31from cengal.parallel_execution.coroutines.coro_scheduler import * 32from cengal.parallel_execution.coroutines.coro_standard_services.timer_func_runner import TimerFuncRunner, add_timer_func_run_from_other_service, discard_timer_func_run_from_other_service 33 34__author__ = "ButenkoMS <gtalk@butenkoms.space>" 35__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 36__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 37__license__ = "Apache License, Version 2.0" 38__version__ = "4.4.1" 39__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 40__email__ = "gtalk@butenkoms.space" 41# __status__ = "Prototype" 42__status__ = "Development" 43# __status__ = "Production" 44 45 46class LazyPrint(TypedService[None]): 47 def __init__(self, loop): 48 super().__init__(loop) 49 self.requests = list() 50 self.update_period = 1 / 60 51 self.update_timer_request = None 52 53 def single_task_registration_or_immediate_processing(self, *args, **kwargs) -> Tuple[(bool, None, None)]: 54 self.requests.append((args, kwargs)) 55 self.try_add_timer() 56 return (True, None, None) 57 58 def full_processing_iteration(self): 59 requests_buff = self.requests 60 self.requests = type(self.requests)() 61 for args, kwargs in requests_buff: 62 kwargs['flush'] = False 63 print(*args, **kwargs) 64 65 print(end='', flush=True) 66 self.add_timer() 67 self.make_dead() 68 69 def timer_handler(self): 70 self.update_timer_request = None 71 if self.in_work_impl(): 72 self.make_live() 73 74 def add_timer(self): 75 self.update_timer_request = add_timer_func_run_from_other_service(self, True, self.update_period, self.timer_handler) 76 77 def try_add_timer(self): 78 if self.update_timer_request is None: self.add_timer() 79 80 def _put_direct_request(self, *args, **kwargs): 81 self.requests.append((args, kwargs)) 82 self.try_add_timer() 83 84 def in_work_impl(self) -> bool: 85 return bool(self.requests) 86 87 def in_work(self) -> bool: 88 result: bool = self.in_work_impl() 89 return self.thrifty_in_work(result) 90 91 def destroy(self): 92 try: 93 discard_timer_func_run_from_other_service(self, self.update_timer_request) 94 except CoroSchedulerIsCurrentlyDestroingError: 95 pass 96 97 98def lazy_print(*args, **kwargs): 99 fallback = False 100 try: 101 cs = current_coro_scheduler() 102 if cs._destroyed: 103 fallback = True 104 except OutsideCoroSchedulerContext: 105 fallback = True 106 107 if fallback: 108 fallback = False 109 try: 110 cs: CoroSchedulerType = primary_coro_scheduler() 111 if cs._destroyed: 112 fallback = True 113 except PrimaryCoroSchedulerWasNotSet: 114 fallback = True 115 116 if fallback: 117 fallback = False 118 print(*args, **kwargs) 119 else: 120 cs.get_service_instance(LazyPrint)._put_direct_request(*args, **kwargs) 121 122 123lprint = lazy_print 124lp = lazy_print
class
LazyPrint(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.TypedService[NoneType]):
47class LazyPrint(TypedService[None]): 48 def __init__(self, loop): 49 super().__init__(loop) 50 self.requests = list() 51 self.update_period = 1 / 60 52 self.update_timer_request = None 53 54 def single_task_registration_or_immediate_processing(self, *args, **kwargs) -> Tuple[(bool, None, None)]: 55 self.requests.append((args, kwargs)) 56 self.try_add_timer() 57 return (True, None, None) 58 59 def full_processing_iteration(self): 60 requests_buff = self.requests 61 self.requests = type(self.requests)() 62 for args, kwargs in requests_buff: 63 kwargs['flush'] = False 64 print(*args, **kwargs) 65 66 print(end='', flush=True) 67 self.add_timer() 68 self.make_dead() 69 70 def timer_handler(self): 71 self.update_timer_request = None 72 if self.in_work_impl(): 73 self.make_live() 74 75 def add_timer(self): 76 self.update_timer_request = add_timer_func_run_from_other_service(self, True, self.update_period, self.timer_handler) 77 78 def try_add_timer(self): 79 if self.update_timer_request is None: self.add_timer() 80 81 def _put_direct_request(self, *args, **kwargs): 82 self.requests.append((args, kwargs)) 83 self.try_add_timer() 84 85 def in_work_impl(self) -> bool: 86 return bool(self.requests) 87 88 def in_work(self) -> bool: 89 result: bool = self.in_work_impl() 90 return self.thrifty_in_work(result) 91 92 def destroy(self): 93 try: 94 discard_timer_func_run_from_other_service(self, self.update_timer_request) 95 except CoroSchedulerIsCurrentlyDestroingError: 96 pass
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
def
single_task_registration_or_immediate_processing(self, *args, **kwargs) -> Tuple[bool, NoneType, NoneType]:
def
in_work(self) -> bool:
88 def in_work(self) -> bool: 89 result: bool = self.in_work_impl() 90 return self.thrifty_in_work(result)
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- time_left_before_next_event
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
def
lazy_print(*args, **kwargs):
99def lazy_print(*args, **kwargs): 100 fallback = False 101 try: 102 cs = current_coro_scheduler() 103 if cs._destroyed: 104 fallback = True 105 except OutsideCoroSchedulerContext: 106 fallback = True 107 108 if fallback: 109 fallback = False 110 try: 111 cs: CoroSchedulerType = primary_coro_scheduler() 112 if cs._destroyed: 113 fallback = True 114 except PrimaryCoroSchedulerWasNotSet: 115 fallback = True 116 117 if fallback: 118 fallback = False 119 print(*args, **kwargs) 120 else: 121 cs.get_service_instance(LazyPrint)._put_direct_request(*args, **kwargs)
def
lprint(*args, **kwargs):
99def lazy_print(*args, **kwargs): 100 fallback = False 101 try: 102 cs = current_coro_scheduler() 103 if cs._destroyed: 104 fallback = True 105 except OutsideCoroSchedulerContext: 106 fallback = True 107 108 if fallback: 109 fallback = False 110 try: 111 cs: CoroSchedulerType = primary_coro_scheduler() 112 if cs._destroyed: 113 fallback = True 114 except PrimaryCoroSchedulerWasNotSet: 115 fallback = True 116 117 if fallback: 118 fallback = False 119 print(*args, **kwargs) 120 else: 121 cs.get_service_instance(LazyPrint)._put_direct_request(*args, **kwargs)
def
lp(*args, **kwargs):
99def lazy_print(*args, **kwargs): 100 fallback = False 101 try: 102 cs = current_coro_scheduler() 103 if cs._destroyed: 104 fallback = True 105 except OutsideCoroSchedulerContext: 106 fallback = True 107 108 if fallback: 109 fallback = False 110 try: 111 cs: CoroSchedulerType = primary_coro_scheduler() 112 if cs._destroyed: 113 fallback = True 114 except PrimaryCoroSchedulerWasNotSet: 115 fallback = True 116 117 if fallback: 118 fallback = False 119 print(*args, **kwargs) 120 else: 121 cs.get_service_instance(LazyPrint)._put_direct_request(*args, **kwargs)