cengal.code_flow_control.call_history_reapplier.versions.v_0.call_history_reapplier
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18__all__ = ['CallHistoryReapplier', 'CoroPriority'] 19 20 21from typing import Dict, Tuple, Hashable, Any 22from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import * 23 24 25""" 26Module Docstring 27Docstrings: http://www.python.org/dev/peps/pep-0257/ 28""" 29 30__author__ = "ButenkoMS <gtalk@butenkoms.space>" 31__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 32__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 33__license__ = "Apache License, Version 2.0" 34__version__ = "4.4.1" 35__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 36__email__ = "gtalk@butenkoms.space" 37# __status__ = "Prototype" 38__status__ = "Development" 39# __status__ = "Production" 40 41 42class CallHistoryReapplier: 43 def __init__(self, priority: CoroPriority=CoroPriority.normal): 44 self.history: Dict = dict() 45 self.priority = priority 46 47 def call_impl(self, *args, **kwargs): 48 raise NotImplementedError 49 50 def args_to_key_value(self, *args, **kwargs) -> Tuple[Hashable, Any]: 51 raise NotImplementedError 52 53 def key_value_to_args(self, key: Hashable, value: Any) -> Tuple[Tuple, Dict]: 54 raise NotImplementedError 55 56 def __call__(self, *args, **kwargs): 57 key, value = self.args_to_key_value(*args, **kwargs) 58 self.history[key] = value 59 self.call_impl(*args, **kwargs) 60 61 def reapply(self): 62 ly = gly(self.priority) 63 new_history = dict() 64 while self.history: 65 history_buff = self.history 66 self.history = type(self.history)() 67 for key, value in history_buff.items(): 68 args, kwargs = self.key_value_to_args(key, value) 69 new_history[key] = value 70 self.call_impl(*args, **kwargs) 71 ly() 72 73 # in order to not lose call history made in intermediate of the reapplying process: 74 # self.history = new_history 75 new_history.update(self.history) 76 self.history = new_history 77 78 def destroy(self): 79 self.history = type(self.history)()
class
CallHistoryReapplier:
43class CallHistoryReapplier: 44 def __init__(self, priority: CoroPriority=CoroPriority.normal): 45 self.history: Dict = dict() 46 self.priority = priority 47 48 def call_impl(self, *args, **kwargs): 49 raise NotImplementedError 50 51 def args_to_key_value(self, *args, **kwargs) -> Tuple[Hashable, Any]: 52 raise NotImplementedError 53 54 def key_value_to_args(self, key: Hashable, value: Any) -> Tuple[Tuple, Dict]: 55 raise NotImplementedError 56 57 def __call__(self, *args, **kwargs): 58 key, value = self.args_to_key_value(*args, **kwargs) 59 self.history[key] = value 60 self.call_impl(*args, **kwargs) 61 62 def reapply(self): 63 ly = gly(self.priority) 64 new_history = dict() 65 while self.history: 66 history_buff = self.history 67 self.history = type(self.history)() 68 for key, value in history_buff.items(): 69 args, kwargs = self.key_value_to_args(key, value) 70 new_history[key] = value 71 self.call_impl(*args, **kwargs) 72 ly() 73 74 # in order to not lose call history made in intermediate of the reapplying process: 75 # self.history = new_history 76 new_history.update(self.history) 77 self.history = new_history 78 79 def destroy(self): 80 self.history = type(self.history)()
CallHistoryReapplier( priority: CoroPriority = <CoroPriority.normal: 1>)
def
reapply(self):
62 def reapply(self): 63 ly = gly(self.priority) 64 new_history = dict() 65 while self.history: 66 history_buff = self.history 67 self.history = type(self.history)() 68 for key, value in history_buff.items(): 69 args, kwargs = self.key_value_to_args(key, value) 70 new_history[key] = value 71 self.call_impl(*args, **kwargs) 72 ly() 73 74 # in order to not lose call history made in intermediate of the reapplying process: 75 # self.history = new_history 76 new_history.update(self.history) 77 self.history = new_history
class
CoroPriority(enum.Enum):
An enumeration.
high =
<CoroPriority.high: 0>
normal =
<CoroPriority.normal: 1>
low =
<CoroPriority.low: 2>
Inherited Members
- enum.Enum
- name
- value