cengal.parallel_execution.coroutines.coro_standard_services.fast_aggregator.versions.v_0.fast_aggregator

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18"""
 19Module Docstring
 20Docstrings: http://www.python.org/dev/peps/pep-0257/
 21"""
 22
 23__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 24__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 25__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 26__license__ = "Apache License, Version 2.0"
 27__version__ = "4.4.1"
 28__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 29__email__ = "gtalk@butenkoms.space"
 30# __status__ = "Prototype"
 31__status__ = "Development"
 32# __status__ = "Production"
 33
 34
 35__all__ = ['FastAggregatorRequest', 'FastAggregatorWaitFor', 'FastAggregatorPutSingle', 'FastAggregatorPutMultiple', 'FastAggregator', 'FastAggregatorWriter', 'FastAggregatorClient']
 36
 37
 38from collections import deque
 39
 40from cengal.parallel_execution.coroutines.coro_scheduler import *
 41from cengal.introspection.inspect import get_exception
 42from typing import Hashable, Tuple, Dict, List, Any, Optional, Set, Deque
 43
 44
 45class FastAggregatorWaitFor(TypedServiceRequest[Any]):
 46    def __call__(self, key: Hashable, queued: bool = True) -> 'FastAggregatorWaitFor':
 47        """Will block coroutine untill result is ready
 48
 49        Args:
 50            key (Hashable): _description_
 51
 52        Returns:
 53            ServiceRequest: _description_
 54        """        
 55        return self._save(self.default__request__type__, key, queued)
 56
 57
 58class FastAggregatorPutSingle(TypedServiceRequest[None]):
 59    default__request__type__ = 1
 60
 61    def __call__(self, key: Hashable, data: Any) -> 'FastAggregatorPutSingle':
 62        return self._save(self.default__request__type__, key, data)
 63
 64
 65class FastAggregatorPutMultiple(TypedServiceRequest[None]):
 66    default__request__type__ = 2
 67    
 68    def __call__(self, data: Dict[Hashable, List[Any]]) -> 'FastAggregatorPutMultiple':
 69        return self._save(self.default__request__type__, data)
 70
 71
 72class FastAggregatorRequest(ServiceRequest):
 73    def wait(self, key: Hashable, queued: bool = True) -> FastAggregatorWaitFor:
 74        """Will block coroutine untill result is ready
 75
 76        Args:
 77            key (Hashable): _description_
 78            queued (bool, optional): When True - works like loadbalansers works. Otherwise gets same link to data as a first requester in a queue. Defaults to True.
 79
 80        Returns:
 81            Any: _description_
 82        """        
 83        return FastAggregatorWaitFor[None]()(key, queued)
 84
 85    def put_single(self, key: Hashable, data: Any) -> FastAggregatorPutSingle:
 86        return FastAggregatorPutSingle[None]()(key, data)
 87
 88    def put_multiple(self, data: Dict[Hashable, List[Any]]) -> FastAggregatorPutMultiple:
 89        return FastAggregatorPutMultiple[None]()(data)
 90
 91
 92class FastAggregator(DualImmediateProcessingServiceMixin, TypedService[Any]):
 93    def __init__(self, loop: CoroSchedulerType):
 94        super().__init__(loop)
 95        self.data: Dict[Hashable, List[Any]] = dict()
 96        self.requester_by_key: Dict[Hashable, Deque[Set[CoroID]]] = dict()
 97        self._request_workers = {
 98            0: self._on_wait,
 99            1: self._on_put_single,
100            2: self._on_put_multiple,
101        }
102
103    def single_task_registration_or_immediate_processing_single(
104            self, key: Hashable) -> Tuple[bool, None, None]:
105        exception = None
106        result = None
107        try:
108            result = self.data[key]
109            del self.data[key]
110        except:
111            exception = get_exception()
112        
113        return True, result, exception
114    
115    def _on_wait(self, key: Hashable, queued: bool = True) -> Tuple[bool, None, None]:
116        if key not in self.requester_by_key:
117            self.requester_by_key[key] = deque()
118        
119        if (not queued) and (self.requester_by_key[key]):
120            self.requester_by_key[key][0].add(self.current_caller_coro_info.coro_id)
121        else:
122            self.requester_by_key[key].append({self.current_caller_coro_info.coro_id})
123
124        self.make_live()
125        return False, None, None
126    
127    def _on_put_single(self, key: Hashable, data: Any) -> Tuple[bool, None, None]:
128        self.put_single(key, data)
129        return True, None, None
130    
131    def _on_put_multiple(self, data: Dict[Hashable, List[Any]]) -> Tuple[bool, None, None]:
132        self.put_multiple(data)
133        return True, None, None
134    
135    def put_single(self, key: Hashable, data: Any):
136        if key not in self.data:
137            self.data[key] = list()
138
139        if (key in self.requester_by_key) and self.requester_by_key[key]:
140            self.make_live()
141
142        self.data[key].append(data)
143    
144    def put_multiple(self, data: Dict[Hashable, List[Any]]):
145        need_to_make_live = False
146        
147        for key in data.keys():
148            if key not in self.data:
149                self.data[key] = list()
150
151            if (key in self.requester_by_key) and self.requester_by_key[key]:
152                need_to_make_live = True
153            
154            self.data[key].extend(data[key])
155        
156        if need_to_make_live:
157            self.make_live()
158
159    def full_processing_iteration(self):
160        requester_by_key_bak = self.requester_by_key
161        possible_keys = set(requester_by_key_bak.keys()) & set(self.data.keys())
162        for key in possible_keys:
163            key_requesters_coro_ids_bak = requester_by_key_bak[key]
164            if key_requesters_coro_ids_bak:
165                first_requesters_coro_ids: Set[CoroID] = key_requesters_coro_ids_bak.popleft()
166                data = self.data[key]
167                for first_requester_coro_id in first_requesters_coro_ids:
168                    self.register_response(first_requester_coro_id, data, None)
169                
170                del self.data[key]
171            
172            if not key_requesters_coro_ids_bak:
173                del requester_by_key_bak[key]
174        
175        self.make_dead()
176
177    def in_work(self) -> bool:
178        return self.thrifty_in_work(bool(self.requester_by_key))
179
180
181FastAggregatorWaitFor.default_service_type = FastAggregator
182FastAggregatorPutSingle.default_service_type = FastAggregator
183FastAggregatorPutMultiple.default_service_type = FastAggregator
184
185
186class FastAggregatorWriter:
187    def __init__(self) -> None:
188        self.server:Optional[FastAggregator] = None
189        self.data_buffer: Dict[Hashable, List[Any]] = dict()
190    
191    def try_find_server(self):
192        loop = CoroScheduler.current_loop()
193        if loop is not None:
194            self.server = loop.get_service_instance(FastAggregator)
195    
196    def __call__(self, key: Hashable, data: Any) -> Any:
197        if self.server is None:
198            self.try_find_server()
199        
200        if self.server is None:
201            if key not in self.data_buffer:
202                self.data_buffer[key] = list()
203            
204            self.data_buffer[key].append(data)
205        else:
206            if self.data_buffer:
207                self.server.put_multiple(self.data_buffer)
208            
209            self.server.put_single(key, data)
210
211
212FastAggregatorClient = FastAggregatorWriter
class FastAggregatorRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
73class FastAggregatorRequest(ServiceRequest):
74    def wait(self, key: Hashable, queued: bool = True) -> FastAggregatorWaitFor:
75        """Will block coroutine untill result is ready
76
77        Args:
78            key (Hashable): _description_
79            queued (bool, optional): When True - works like loadbalansers works. Otherwise gets same link to data as a first requester in a queue. Defaults to True.
80
81        Returns:
82            Any: _description_
83        """        
84        return FastAggregatorWaitFor[None]()(key, queued)
85
86    def put_single(self, key: Hashable, data: Any) -> FastAggregatorPutSingle:
87        return FastAggregatorPutSingle[None]()(key, data)
88
89    def put_multiple(self, data: Dict[Hashable, List[Any]]) -> FastAggregatorPutMultiple:
90        return FastAggregatorPutMultiple[None]()(data)
def wait( self, key: typing.Hashable, queued: bool = True) -> FastAggregatorWaitFor:
74    def wait(self, key: Hashable, queued: bool = True) -> FastAggregatorWaitFor:
75        """Will block coroutine untill result is ready
76
77        Args:
78            key (Hashable): _description_
79            queued (bool, optional): When True - works like loadbalansers works. Otherwise gets same link to data as a first requester in a queue. Defaults to True.
80
81        Returns:
82            Any: _description_
83        """        
84        return FastAggregatorWaitFor[None]()(key, queued)

Will block coroutine untill result is ready

Args: key (Hashable): _description_ queued (bool, optional): When True - works like loadbalansers works. Otherwise gets same link to data as a first requester in a queue. Defaults to True.

Returns: Any: _description_

def put_single( self, key: typing.Hashable, data: typing.Any) -> FastAggregatorPutSingle:
86    def put_single(self, key: Hashable, data: Any) -> FastAggregatorPutSingle:
87        return FastAggregatorPutSingle[None]()(key, data)
def put_multiple( self, data: typing.Dict[typing.Hashable, typing.List[typing.Any]]) -> FastAggregatorPutMultiple:
89    def put_multiple(self, data: Dict[Hashable, List[Any]]) -> FastAggregatorPutMultiple:
90        return FastAggregatorPutMultiple[None]()(data)
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
default_service_type
default__request__type__
request_type
args
kwargs
provide_to_request_handler
interface
i
async_interface
ai
class FastAggregatorWaitFor(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.TypedServiceRequest[typing.Any]):
46class FastAggregatorWaitFor(TypedServiceRequest[Any]):
47    def __call__(self, key: Hashable, queued: bool = True) -> 'FastAggregatorWaitFor':
48        """Will block coroutine untill result is ready
49
50        Args:
51            key (Hashable): _description_
52
53        Returns:
54            ServiceRequest: _description_
55        """        
56        return self._save(self.default__request__type__, key, queued)

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] = <class 'FastAggregator'>
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
default__request__type__
request_type
args
kwargs
provide_to_request_handler
interface
i
async_interface
ai
class FastAggregatorPutSingle(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.TypedServiceRequest[NoneType]):
59class FastAggregatorPutSingle(TypedServiceRequest[None]):
60    default__request__type__ = 1
61
62    def __call__(self, key: Hashable, data: Any) -> 'FastAggregatorPutSingle':
63        return self._save(self.default__request__type__, key, data)

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

default__request__type__: int = 1
default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] = <class 'FastAggregator'>
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
request_type
args
kwargs
provide_to_request_handler
interface
i
async_interface
ai
class FastAggregatorPutMultiple(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.TypedServiceRequest[NoneType]):
66class FastAggregatorPutMultiple(TypedServiceRequest[None]):
67    default__request__type__ = 2
68    
69    def __call__(self, data: Dict[Hashable, List[Any]]) -> 'FastAggregatorPutMultiple':
70        return self._save(self.default__request__type__, data)

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

default__request__type__: int = 2
default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] = <class 'FastAggregator'>
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
request_type
args
kwargs
provide_to_request_handler
interface
i
async_interface
ai
class FastAggregator(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.DualImmediateProcessingServiceMixin, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.TypedService[typing.Any]):
 93class FastAggregator(DualImmediateProcessingServiceMixin, TypedService[Any]):
 94    def __init__(self, loop: CoroSchedulerType):
 95        super().__init__(loop)
 96        self.data: Dict[Hashable, List[Any]] = dict()
 97        self.requester_by_key: Dict[Hashable, Deque[Set[CoroID]]] = dict()
 98        self._request_workers = {
 99            0: self._on_wait,
100            1: self._on_put_single,
101            2: self._on_put_multiple,
102        }
103
104    def single_task_registration_or_immediate_processing_single(
105            self, key: Hashable) -> Tuple[bool, None, None]:
106        exception = None
107        result = None
108        try:
109            result = self.data[key]
110            del self.data[key]
111        except:
112            exception = get_exception()
113        
114        return True, result, exception
115    
116    def _on_wait(self, key: Hashable, queued: bool = True) -> Tuple[bool, None, None]:
117        if key not in self.requester_by_key:
118            self.requester_by_key[key] = deque()
119        
120        if (not queued) and (self.requester_by_key[key]):
121            self.requester_by_key[key][0].add(self.current_caller_coro_info.coro_id)
122        else:
123            self.requester_by_key[key].append({self.current_caller_coro_info.coro_id})
124
125        self.make_live()
126        return False, None, None
127    
128    def _on_put_single(self, key: Hashable, data: Any) -> Tuple[bool, None, None]:
129        self.put_single(key, data)
130        return True, None, None
131    
132    def _on_put_multiple(self, data: Dict[Hashable, List[Any]]) -> Tuple[bool, None, None]:
133        self.put_multiple(data)
134        return True, None, None
135    
136    def put_single(self, key: Hashable, data: Any):
137        if key not in self.data:
138            self.data[key] = list()
139
140        if (key in self.requester_by_key) and self.requester_by_key[key]:
141            self.make_live()
142
143        self.data[key].append(data)
144    
145    def put_multiple(self, data: Dict[Hashable, List[Any]]):
146        need_to_make_live = False
147        
148        for key in data.keys():
149            if key not in self.data:
150                self.data[key] = list()
151
152            if (key in self.requester_by_key) and self.requester_by_key[key]:
153                need_to_make_live = True
154            
155            self.data[key].extend(data[key])
156        
157        if need_to_make_live:
158            self.make_live()
159
160    def full_processing_iteration(self):
161        requester_by_key_bak = self.requester_by_key
162        possible_keys = set(requester_by_key_bak.keys()) & set(self.data.keys())
163        for key in possible_keys:
164            key_requesters_coro_ids_bak = requester_by_key_bak[key]
165            if key_requesters_coro_ids_bak:
166                first_requesters_coro_ids: Set[CoroID] = key_requesters_coro_ids_bak.popleft()
167                data = self.data[key]
168                for first_requester_coro_id in first_requesters_coro_ids:
169                    self.register_response(first_requester_coro_id, data, None)
170                
171                del self.data[key]
172            
173            if not key_requesters_coro_ids_bak:
174                del requester_by_key_bak[key]
175        
176        self.make_dead()
177
178    def in_work(self) -> bool:
179        return self.thrifty_in_work(bool(self.requester_by_key))

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

FastAggregator( loop: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
 94    def __init__(self, loop: CoroSchedulerType):
 95        super().__init__(loop)
 96        self.data: Dict[Hashable, List[Any]] = dict()
 97        self.requester_by_key: Dict[Hashable, Deque[Set[CoroID]]] = dict()
 98        self._request_workers = {
 99            0: self._on_wait,
100            1: self._on_put_single,
101            2: self._on_put_multiple,
102        }
data: Dict[Hashable, List[Any]]
requester_by_key: Dict[Hashable, Deque[Set[int]]]
def put_single(self, key: typing.Hashable, data: typing.Any):
136    def put_single(self, key: Hashable, data: Any):
137        if key not in self.data:
138            self.data[key] = list()
139
140        if (key in self.requester_by_key) and self.requester_by_key[key]:
141            self.make_live()
142
143        self.data[key].append(data)
def put_multiple(self, data: typing.Dict[typing.Hashable, typing.List[typing.Any]]):
145    def put_multiple(self, data: Dict[Hashable, List[Any]]):
146        need_to_make_live = False
147        
148        for key in data.keys():
149            if key not in self.data:
150                self.data[key] = list()
151
152            if (key in self.requester_by_key) and self.requester_by_key[key]:
153                need_to_make_live = True
154            
155            self.data[key].extend(data[key])
156        
157        if need_to_make_live:
158            self.make_live()
def full_processing_iteration(self):
160    def full_processing_iteration(self):
161        requester_by_key_bak = self.requester_by_key
162        possible_keys = set(requester_by_key_bak.keys()) & set(self.data.keys())
163        for key in possible_keys:
164            key_requesters_coro_ids_bak = requester_by_key_bak[key]
165            if key_requesters_coro_ids_bak:
166                first_requesters_coro_ids: Set[CoroID] = key_requesters_coro_ids_bak.popleft()
167                data = self.data[key]
168                for first_requester_coro_id in first_requesters_coro_ids:
169                    self.register_response(first_requester_coro_id, data, None)
170                
171                del self.data[key]
172            
173            if not key_requesters_coro_ids_bak:
174                del requester_by_key_bak[key]
175        
176        self.make_dead()
def in_work(self) -> bool:
178    def in_work(self) -> bool:
179        return self.thrifty_in_work(bool(self.requester_by_key))

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.DualImmediateProcessingServiceMixin
single_task_registration_or_immediate_processing
single_task_registration_or_immediate_processing_multiple
single_task_registration_or_immediate_processing_single
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
in_forground_work
thrifty_in_work
time_left_before_next_event
is_low_latency
make_live
make_dead
service_id_impl
service_id
destroy
class FastAggregatorWriter:
187class FastAggregatorWriter:
188    def __init__(self) -> None:
189        self.server:Optional[FastAggregator] = None
190        self.data_buffer: Dict[Hashable, List[Any]] = dict()
191    
192    def try_find_server(self):
193        loop = CoroScheduler.current_loop()
194        if loop is not None:
195            self.server = loop.get_service_instance(FastAggregator)
196    
197    def __call__(self, key: Hashable, data: Any) -> Any:
198        if self.server is None:
199            self.try_find_server()
200        
201        if self.server is None:
202            if key not in self.data_buffer:
203                self.data_buffer[key] = list()
204            
205            self.data_buffer[key].append(data)
206        else:
207            if self.data_buffer:
208                self.server.put_multiple(self.data_buffer)
209            
210            self.server.put_single(key, data)
server: Union[FastAggregator, NoneType]
data_buffer: Dict[Hashable, List[Any]]
def try_find_server(self):
192    def try_find_server(self):
193        loop = CoroScheduler.current_loop()
194        if loop is not None:
195            self.server = loop.get_service_instance(FastAggregator)
FastAggregatorClient = <class 'FastAggregatorWriter'>