cengal.code_flow_control.call_history_reapplier.versions.v_0.call_history_reapplier

 1#!/usr/bin/env python
 2# coding=utf-8
 3
 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
 5# 
 6# Licensed under the Apache License, Version 2.0 (the "License");
 7# you may not use this file except in compliance with the License.
 8# You may obtain a copy of the License at
 9# 
10#     http://www.apache.org/licenses/LICENSE-2.0
11# 
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18__all__ = ['CallHistoryReapplier', 'CoroPriority']
19
20
21from typing import Dict, Tuple, Hashable, Any
22from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import *
23
24
25"""
26Module Docstring
27Docstrings: http://www.python.org/dev/peps/pep-0257/
28"""
29
30__author__ = "ButenkoMS <gtalk@butenkoms.space>"
31__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
32__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
33__license__ = "Apache License, Version 2.0"
34__version__ = "4.4.1"
35__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
36__email__ = "gtalk@butenkoms.space"
37# __status__ = "Prototype"
38__status__ = "Development"
39# __status__ = "Production"
40
41
42class CallHistoryReapplier:
43    def __init__(self, priority: CoroPriority=CoroPriority.normal):
44        self.history: Dict = dict()
45        self.priority = priority
46    
47    def call_impl(self, *args, **kwargs):
48        raise NotImplementedError
49    
50    def args_to_key_value(self, *args, **kwargs) -> Tuple[Hashable, Any]:
51        raise NotImplementedError
52    
53    def key_value_to_args(self, key: Hashable, value: Any) -> Tuple[Tuple, Dict]:
54        raise NotImplementedError
55    
56    def __call__(self, *args, **kwargs):
57        key, value = self.args_to_key_value(*args, **kwargs)
58        self.history[key] = value
59        self.call_impl(*args, **kwargs)
60    
61    def reapply(self):
62        ly = gly(self.priority)
63        new_history = dict()
64        while self.history:
65            history_buff = self.history
66            self.history = type(self.history)()
67            for key, value in history_buff.items():
68                args, kwargs = self.key_value_to_args(key, value)
69                new_history[key] = value
70                self.call_impl(*args, **kwargs)
71                ly()
72        
73        # in order to not lose call history made in intermediate of the reapplying process:
74        # self.history = new_history
75        new_history.update(self.history)
76        self.history = new_history
77
78    def destroy(self):
79        self.history = type(self.history)()
class CallHistoryReapplier:
43class CallHistoryReapplier:
44    def __init__(self, priority: CoroPriority=CoroPriority.normal):
45        self.history: Dict = dict()
46        self.priority = priority
47    
48    def call_impl(self, *args, **kwargs):
49        raise NotImplementedError
50    
51    def args_to_key_value(self, *args, **kwargs) -> Tuple[Hashable, Any]:
52        raise NotImplementedError
53    
54    def key_value_to_args(self, key: Hashable, value: Any) -> Tuple[Tuple, Dict]:
55        raise NotImplementedError
56    
57    def __call__(self, *args, **kwargs):
58        key, value = self.args_to_key_value(*args, **kwargs)
59        self.history[key] = value
60        self.call_impl(*args, **kwargs)
61    
62    def reapply(self):
63        ly = gly(self.priority)
64        new_history = dict()
65        while self.history:
66            history_buff = self.history
67            self.history = type(self.history)()
68            for key, value in history_buff.items():
69                args, kwargs = self.key_value_to_args(key, value)
70                new_history[key] = value
71                self.call_impl(*args, **kwargs)
72                ly()
73        
74        # in order to not lose call history made in intermediate of the reapplying process:
75        # self.history = new_history
76        new_history.update(self.history)
77        self.history = new_history
78
79    def destroy(self):
80        self.history = type(self.history)()
CallHistoryReapplier( priority: CoroPriority = <CoroPriority.normal: 1>)
44    def __init__(self, priority: CoroPriority=CoroPriority.normal):
45        self.history: Dict = dict()
46        self.priority = priority
history: Dict
priority
def call_impl(self, *args, **kwargs):
48    def call_impl(self, *args, **kwargs):
49        raise NotImplementedError
def args_to_key_value(self, *args, **kwargs) -> Tuple[Hashable, Any]:
51    def args_to_key_value(self, *args, **kwargs) -> Tuple[Hashable, Any]:
52        raise NotImplementedError
def key_value_to_args(self, key: Hashable, value: Any) -> Tuple[Tuple, Dict]:
54    def key_value_to_args(self, key: Hashable, value: Any) -> Tuple[Tuple, Dict]:
55        raise NotImplementedError
def reapply(self):
62    def reapply(self):
63        ly = gly(self.priority)
64        new_history = dict()
65        while self.history:
66            history_buff = self.history
67            self.history = type(self.history)()
68            for key, value in history_buff.items():
69                args, kwargs = self.key_value_to_args(key, value)
70                new_history[key] = value
71                self.call_impl(*args, **kwargs)
72                ly()
73        
74        # in order to not lose call history made in intermediate of the reapplying process:
75        # self.history = new_history
76        new_history.update(self.history)
77        self.history = new_history
def destroy(self):
79    def destroy(self):
80        self.history = type(self.history)()
class CoroPriority(enum.Enum):
59class CoroPriority(Enum):
60    high = 0
61    normal = 1
62    low = 2

An enumeration.

high = <CoroPriority.high: 0>
normal = <CoroPriority.normal: 1>
low = <CoroPriority.low: 2>
Inherited Members
enum.Enum
name
value