cengal.parallel_execution.coroutines.coro_standard_services.fast_aggregator.versions.v_0.fast_aggregator
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18""" 19Module Docstring 20Docstrings: http://www.python.org/dev/peps/pep-0257/ 21""" 22 23__author__ = "ButenkoMS <gtalk@butenkoms.space>" 24__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 25__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 26__license__ = "Apache License, Version 2.0" 27__version__ = "4.4.1" 28__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 29__email__ = "gtalk@butenkoms.space" 30# __status__ = "Prototype" 31__status__ = "Development" 32# __status__ = "Production" 33 34 35__all__ = ['FastAggregatorRequest', 'FastAggregatorWaitFor', 'FastAggregatorPutSingle', 'FastAggregatorPutMultiple', 'FastAggregator', 'FastAggregatorWriter', 'FastAggregatorClient'] 36 37 38from collections import deque 39 40from cengal.parallel_execution.coroutines.coro_scheduler import * 41from cengal.introspection.inspect import get_exception 42from typing import Hashable, Tuple, Dict, List, Any, Optional, Set, Deque 43 44 45class FastAggregatorWaitFor(TypedServiceRequest[Any]): 46 def __call__(self, key: Hashable, queued: bool = True) -> 'FastAggregatorWaitFor': 47 """Will block coroutine untill result is ready 48 49 Args: 50 key (Hashable): _description_ 51 52 Returns: 53 ServiceRequest: _description_ 54 """ 55 return self._save(self.default__request__type__, key, queued) 56 57 58class FastAggregatorPutSingle(TypedServiceRequest[None]): 59 default__request__type__ = 1 60 61 def __call__(self, key: Hashable, data: Any) -> 'FastAggregatorPutSingle': 62 return self._save(self.default__request__type__, key, data) 63 64 65class FastAggregatorPutMultiple(TypedServiceRequest[None]): 66 default__request__type__ = 2 67 68 def __call__(self, data: Dict[Hashable, List[Any]]) -> 'FastAggregatorPutMultiple': 69 return self._save(self.default__request__type__, data) 70 71 72class FastAggregatorRequest(ServiceRequest): 73 def wait(self, key: Hashable, queued: bool = True) -> FastAggregatorWaitFor: 74 """Will block coroutine untill result is ready 75 76 Args: 77 key (Hashable): _description_ 78 queued (bool, optional): When True - works like loadbalansers works. Otherwise gets same link to data as a first requester in a queue. Defaults to True. 79 80 Returns: 81 Any: _description_ 82 """ 83 return FastAggregatorWaitFor[None]()(key, queued) 84 85 def put_single(self, key: Hashable, data: Any) -> FastAggregatorPutSingle: 86 return FastAggregatorPutSingle[None]()(key, data) 87 88 def put_multiple(self, data: Dict[Hashable, List[Any]]) -> FastAggregatorPutMultiple: 89 return FastAggregatorPutMultiple[None]()(data) 90 91 92class FastAggregator(DualImmediateProcessingServiceMixin, TypedService[Any]): 93 def __init__(self, loop: CoroSchedulerType): 94 super().__init__(loop) 95 self.data: Dict[Hashable, List[Any]] = dict() 96 self.requester_by_key: Dict[Hashable, Deque[Set[CoroID]]] = dict() 97 self._request_workers = { 98 0: self._on_wait, 99 1: self._on_put_single, 100 2: self._on_put_multiple, 101 } 102 103 def single_task_registration_or_immediate_processing_single( 104 self, key: Hashable) -> Tuple[bool, None, None]: 105 exception = None 106 result = None 107 try: 108 result = self.data[key] 109 del self.data[key] 110 except: 111 exception = get_exception() 112 113 return True, result, exception 114 115 def _on_wait(self, key: Hashable, queued: bool = True) -> Tuple[bool, None, None]: 116 if key not in self.requester_by_key: 117 self.requester_by_key[key] = deque() 118 119 if (not queued) and (self.requester_by_key[key]): 120 self.requester_by_key[key][0].add(self.current_caller_coro_info.coro_id) 121 else: 122 self.requester_by_key[key].append({self.current_caller_coro_info.coro_id}) 123 124 self.make_live() 125 return False, None, None 126 127 def _on_put_single(self, key: Hashable, data: Any) -> Tuple[bool, None, None]: 128 self.put_single(key, data) 129 return True, None, None 130 131 def _on_put_multiple(self, data: Dict[Hashable, List[Any]]) -> Tuple[bool, None, None]: 132 self.put_multiple(data) 133 return True, None, None 134 135 def put_single(self, key: Hashable, data: Any): 136 if key not in self.data: 137 self.data[key] = list() 138 139 if (key in self.requester_by_key) and self.requester_by_key[key]: 140 self.make_live() 141 142 self.data[key].append(data) 143 144 def put_multiple(self, data: Dict[Hashable, List[Any]]): 145 need_to_make_live = False 146 147 for key in data.keys(): 148 if key not in self.data: 149 self.data[key] = list() 150 151 if (key in self.requester_by_key) and self.requester_by_key[key]: 152 need_to_make_live = True 153 154 self.data[key].extend(data[key]) 155 156 if need_to_make_live: 157 self.make_live() 158 159 def full_processing_iteration(self): 160 requester_by_key_bak = self.requester_by_key 161 possible_keys = set(requester_by_key_bak.keys()) & set(self.data.keys()) 162 for key in possible_keys: 163 key_requesters_coro_ids_bak = requester_by_key_bak[key] 164 if key_requesters_coro_ids_bak: 165 first_requesters_coro_ids: Set[CoroID] = key_requesters_coro_ids_bak.popleft() 166 data = self.data[key] 167 for first_requester_coro_id in first_requesters_coro_ids: 168 self.register_response(first_requester_coro_id, data, None) 169 170 del self.data[key] 171 172 if not key_requesters_coro_ids_bak: 173 del requester_by_key_bak[key] 174 175 self.make_dead() 176 177 def in_work(self) -> bool: 178 return self.thrifty_in_work(bool(self.requester_by_key)) 179 180 181FastAggregatorWaitFor.default_service_type = FastAggregator 182FastAggregatorPutSingle.default_service_type = FastAggregator 183FastAggregatorPutMultiple.default_service_type = FastAggregator 184 185 186class FastAggregatorWriter: 187 def __init__(self) -> None: 188 self.server:Optional[FastAggregator] = None 189 self.data_buffer: Dict[Hashable, List[Any]] = dict() 190 191 def try_find_server(self): 192 loop = CoroScheduler.current_loop() 193 if loop is not None: 194 self.server = loop.get_service_instance(FastAggregator) 195 196 def __call__(self, key: Hashable, data: Any) -> Any: 197 if self.server is None: 198 self.try_find_server() 199 200 if self.server is None: 201 if key not in self.data_buffer: 202 self.data_buffer[key] = list() 203 204 self.data_buffer[key].append(data) 205 else: 206 if self.data_buffer: 207 self.server.put_multiple(self.data_buffer) 208 209 self.server.put_single(key, data) 210 211 212FastAggregatorClient = FastAggregatorWriter
73class FastAggregatorRequest(ServiceRequest): 74 def wait(self, key: Hashable, queued: bool = True) -> FastAggregatorWaitFor: 75 """Will block coroutine untill result is ready 76 77 Args: 78 key (Hashable): _description_ 79 queued (bool, optional): When True - works like loadbalansers works. Otherwise gets same link to data as a first requester in a queue. Defaults to True. 80 81 Returns: 82 Any: _description_ 83 """ 84 return FastAggregatorWaitFor[None]()(key, queued) 85 86 def put_single(self, key: Hashable, data: Any) -> FastAggregatorPutSingle: 87 return FastAggregatorPutSingle[None]()(key, data) 88 89 def put_multiple(self, data: Dict[Hashable, List[Any]]) -> FastAggregatorPutMultiple: 90 return FastAggregatorPutMultiple[None]()(data)
74 def wait(self, key: Hashable, queued: bool = True) -> FastAggregatorWaitFor: 75 """Will block coroutine untill result is ready 76 77 Args: 78 key (Hashable): _description_ 79 queued (bool, optional): When True - works like loadbalansers works. Otherwise gets same link to data as a first requester in a queue. Defaults to True. 80 81 Returns: 82 Any: _description_ 83 """ 84 return FastAggregatorWaitFor[None]()(key, queued)
Will block coroutine untill result is ready
Args: key (Hashable): _description_ queued (bool, optional): When True - works like loadbalansers works. Otherwise gets same link to data as a first requester in a queue. Defaults to True.
Returns: Any: _description_
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
- default_service_type
- default__request__type__
- request_type
- args
- kwargs
- provide_to_request_handler
- interface
- i
- async_interface
- ai
46class FastAggregatorWaitFor(TypedServiceRequest[Any]): 47 def __call__(self, key: Hashable, queued: bool = True) -> 'FastAggregatorWaitFor': 48 """Will block coroutine untill result is ready 49 50 Args: 51 key (Hashable): _description_ 52 53 Returns: 54 ServiceRequest: _description_ 55 """ 56 return self._save(self.default__request__type__, key, queued)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
- default__request__type__
- request_type
- args
- kwargs
- provide_to_request_handler
- interface
- i
- async_interface
- ai
59class FastAggregatorPutSingle(TypedServiceRequest[None]): 60 default__request__type__ = 1 61 62 def __call__(self, key: Hashable, data: Any) -> 'FastAggregatorPutSingle': 63 return self._save(self.default__request__type__, key, data)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
- request_type
- args
- kwargs
- provide_to_request_handler
- interface
- i
- async_interface
- ai
66class FastAggregatorPutMultiple(TypedServiceRequest[None]): 67 default__request__type__ = 2 68 69 def __call__(self, data: Dict[Hashable, List[Any]]) -> 'FastAggregatorPutMultiple': 70 return self._save(self.default__request__type__, data)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
- request_type
- args
- kwargs
- provide_to_request_handler
- interface
- i
- async_interface
- ai
93class FastAggregator(DualImmediateProcessingServiceMixin, TypedService[Any]): 94 def __init__(self, loop: CoroSchedulerType): 95 super().__init__(loop) 96 self.data: Dict[Hashable, List[Any]] = dict() 97 self.requester_by_key: Dict[Hashable, Deque[Set[CoroID]]] = dict() 98 self._request_workers = { 99 0: self._on_wait, 100 1: self._on_put_single, 101 2: self._on_put_multiple, 102 } 103 104 def single_task_registration_or_immediate_processing_single( 105 self, key: Hashable) -> Tuple[bool, None, None]: 106 exception = None 107 result = None 108 try: 109 result = self.data[key] 110 del self.data[key] 111 except: 112 exception = get_exception() 113 114 return True, result, exception 115 116 def _on_wait(self, key: Hashable, queued: bool = True) -> Tuple[bool, None, None]: 117 if key not in self.requester_by_key: 118 self.requester_by_key[key] = deque() 119 120 if (not queued) and (self.requester_by_key[key]): 121 self.requester_by_key[key][0].add(self.current_caller_coro_info.coro_id) 122 else: 123 self.requester_by_key[key].append({self.current_caller_coro_info.coro_id}) 124 125 self.make_live() 126 return False, None, None 127 128 def _on_put_single(self, key: Hashable, data: Any) -> Tuple[bool, None, None]: 129 self.put_single(key, data) 130 return True, None, None 131 132 def _on_put_multiple(self, data: Dict[Hashable, List[Any]]) -> Tuple[bool, None, None]: 133 self.put_multiple(data) 134 return True, None, None 135 136 def put_single(self, key: Hashable, data: Any): 137 if key not in self.data: 138 self.data[key] = list() 139 140 if (key in self.requester_by_key) and self.requester_by_key[key]: 141 self.make_live() 142 143 self.data[key].append(data) 144 145 def put_multiple(self, data: Dict[Hashable, List[Any]]): 146 need_to_make_live = False 147 148 for key in data.keys(): 149 if key not in self.data: 150 self.data[key] = list() 151 152 if (key in self.requester_by_key) and self.requester_by_key[key]: 153 need_to_make_live = True 154 155 self.data[key].extend(data[key]) 156 157 if need_to_make_live: 158 self.make_live() 159 160 def full_processing_iteration(self): 161 requester_by_key_bak = self.requester_by_key 162 possible_keys = set(requester_by_key_bak.keys()) & set(self.data.keys()) 163 for key in possible_keys: 164 key_requesters_coro_ids_bak = requester_by_key_bak[key] 165 if key_requesters_coro_ids_bak: 166 first_requesters_coro_ids: Set[CoroID] = key_requesters_coro_ids_bak.popleft() 167 data = self.data[key] 168 for first_requester_coro_id in first_requesters_coro_ids: 169 self.register_response(first_requester_coro_id, data, None) 170 171 del self.data[key] 172 173 if not key_requesters_coro_ids_bak: 174 del requester_by_key_bak[key] 175 176 self.make_dead() 177 178 def in_work(self) -> bool: 179 return self.thrifty_in_work(bool(self.requester_by_key))
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
94 def __init__(self, loop: CoroSchedulerType): 95 super().__init__(loop) 96 self.data: Dict[Hashable, List[Any]] = dict() 97 self.requester_by_key: Dict[Hashable, Deque[Set[CoroID]]] = dict() 98 self._request_workers = { 99 0: self._on_wait, 100 1: self._on_put_single, 101 2: self._on_put_multiple, 102 }
145 def put_multiple(self, data: Dict[Hashable, List[Any]]): 146 need_to_make_live = False 147 148 for key in data.keys(): 149 if key not in self.data: 150 self.data[key] = list() 151 152 if (key in self.requester_by_key) and self.requester_by_key[key]: 153 need_to_make_live = True 154 155 self.data[key].extend(data[key]) 156 157 if need_to_make_live: 158 self.make_live()
160 def full_processing_iteration(self): 161 requester_by_key_bak = self.requester_by_key 162 possible_keys = set(requester_by_key_bak.keys()) & set(self.data.keys()) 163 for key in possible_keys: 164 key_requesters_coro_ids_bak = requester_by_key_bak[key] 165 if key_requesters_coro_ids_bak: 166 first_requesters_coro_ids: Set[CoroID] = key_requesters_coro_ids_bak.popleft() 167 data = self.data[key] 168 for first_requester_coro_id in first_requesters_coro_ids: 169 self.register_response(first_requester_coro_id, data, None) 170 171 del self.data[key] 172 173 if not key_requesters_coro_ids_bak: 174 del requester_by_key_bak[key] 175 176 self.make_dead()
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.DualImmediateProcessingServiceMixin
- single_task_registration_or_immediate_processing
- single_task_registration_or_immediate_processing_multiple
- single_task_registration_or_immediate_processing_single
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- time_left_before_next_event
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
- destroy
187class FastAggregatorWriter: 188 def __init__(self) -> None: 189 self.server:Optional[FastAggregator] = None 190 self.data_buffer: Dict[Hashable, List[Any]] = dict() 191 192 def try_find_server(self): 193 loop = CoroScheduler.current_loop() 194 if loop is not None: 195 self.server = loop.get_service_instance(FastAggregator) 196 197 def __call__(self, key: Hashable, data: Any) -> Any: 198 if self.server is None: 199 self.try_find_server() 200 201 if self.server is None: 202 if key not in self.data_buffer: 203 self.data_buffer[key] = list() 204 205 self.data_buffer[key].append(data) 206 else: 207 if self.data_buffer: 208 self.server.put_multiple(self.data_buffer) 209 210 self.server.put_single(key, data)