cengal.parallel_execution.coroutines.coro_standard_services.communication.versions.v_0.communication
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19""" 20Module Docstring 21Docstrings: http://www.python.org/dev/peps/pep-0257/ 22""" 23 24 25__author__ = "ButenkoMS <gtalk@butenkoms.space>" 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 28__license__ = "Apache License, Version 2.0" 29__version__ = "4.4.1" 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 31__email__ = "gtalk@butenkoms.space" 32# __status__ = "Prototype" 33__status__ = "Development" 34# __status__ = "Production" 35 36 37__all__ = ['Communication'] 38 39from cengal.parallel_execution.coroutines.coro_scheduler import * 40from enum import Enum 41from typing import Hashable, NoReturn, Dict, Tuple, Any 42 43 44class Communication(Service): 45 46 class Requests(Enum): 47 send_async = 0 # Coro will get control immediately after message sent 48 send_sync = 1 # Coro will get control only after response will be received 49 read_async = 2 # Coro will get control immediately even if input queue is empty 50 read_sync = 3 # Coro will get control only if input queue is not empty or when new response will be 51 # received 52 named_send_async = 4 # Coro will get control immediately after message sent 53 named_send_sync = 5 # Coro will get control only after response will be received 54 named_read_async = 6 # Coro will get control immediately even if input queue is empty 55 named_read_sync = 7 # Coro will get control only if input queue is not empty or when new response will be 56 # received 57 58 class Request: 59 def __init__(self): 60 self.request_type = None # type: Optional[int] 61 self.args = None # type: Optional[Tuple] 62 self.kwargs = None # type: Optional[Dict] 63 64 def send_async(self, recipient_id: CoroID, message: Any) -> 'Communication.Request': 65 self._save(0, recipient_id, message) 66 return self 67 68 def send_blocking(self, recipient_id: CoroID, message: Any) -> 'Communication.Request': 69 self._save(1, recipient_id, message) 70 return self 71 72 def read_async(self) -> 'Communication.Request': 73 self._save(2) 74 return self 75 76 def read_blocking(self) -> 'Communication.Request': 77 self._save(3) 78 return self 79 80 def send_async_named( 81 self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> 'Communication.Request': 82 self._save(4, sender_id, recipient_id, message) 83 return self 84 85 def send_blocking_named( 86 self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> 'Communication.Request': 87 self._save(5, sender_id, recipient_id, message) 88 return self 89 90 def read_async_named(self, recipient_id: Hashable) -> 'Communication.Request': 91 self._save(6, recipient_id) 92 return self 93 94 def read_blocking_named(self, recipient_id: Hashable) -> 'Communication.Request': 95 self._save(7, recipient_id) 96 return self 97 98 def _save(self, __request__type__: int, *args, **kwargs): 99 self.request_type = __request__type__ 100 self.args = args 101 self.kwargs = kwargs 102 103 def __init__(self, loop: CoroSchedulerType): 104 super(Communication, self).__init__(loop) 105 106 def single_task_registration_or_immediate_processing( 107 self, request: 'Communication.Requests', *args, **kwargs 108 ) -> Tuple[bool, Any]: 109 if 0 == request.request_type: 110 self.request_send_async(*args, **kwargs) 111 return True, None, None 112 elif 1 == request.request_type: 113 self.request_send_sync(*args, **kwargs) 114 return False, None, None 115 elif 2 == request.request_type: 116 result = self.request_read_async() 117 return True, result, None 118 elif 3 == request.request_type: 119 self.request_read_sync() 120 return False, None, None 121 elif 4 == request.request_type: 122 self.request_named_send_async(*args, **kwargs) 123 return True, None, None 124 elif 5 == request.request_type: 125 self.request_named_send_sync(*args, **kwargs) 126 return False, None, None 127 elif 6 == request.request_type: 128 result = self.request_named_read_async(*args, **kwargs) 129 return True, result, None 130 elif 7 == request.request_type: 131 self.request_named_read_sync(*args, **kwargs) 132 return False, None, None 133 134 def request_send_async(self, recipient_id: CoroID, message: Any) -> NoReturn: 135 raise NotImplementedError 136 137 def request_send_sync(self, recipient_id: CoroID, message: Any) -> NoReturn: 138 raise NotImplementedError 139 140 def request_read_async(self) -> Tuple[CoroID, Any]: 141 """ 142 Will return tuple with sender coro ID and message 143 144 :rtype: Tuple[CoroID, Any] 145 """ 146 raise NotImplementedError 147 148 def request_read_sync(self) -> NoReturn: 149 raise NotImplementedError 150 151 def request_named_send_async(self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> NoReturn: 152 raise NotImplementedError 153 154 def request_named_send_sync(self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> NoReturn: 155 raise NotImplementedError 156 157 def request_named_read_async(self, recipient_id: Hashable) -> Tuple[Hashable, Any]: 158 """ 159 Will return tuple with sender ID and message 160 161 :rtype: Tuple[Hashable, Any] 162 """ 163 raise NotImplementedError 164 165 def request_named_read_sync(self, recipient_id: Hashable) -> NoReturn: 166 raise NotImplementedError 167 168 def full_processing_iteration(self): 169 pass 170 171 def in_work(self) -> bool: 172 return self.thrifty_in_work(False)
class
Communication(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service):
45class Communication(Service): 46 47 class Requests(Enum): 48 send_async = 0 # Coro will get control immediately after message sent 49 send_sync = 1 # Coro will get control only after response will be received 50 read_async = 2 # Coro will get control immediately even if input queue is empty 51 read_sync = 3 # Coro will get control only if input queue is not empty or when new response will be 52 # received 53 named_send_async = 4 # Coro will get control immediately after message sent 54 named_send_sync = 5 # Coro will get control only after response will be received 55 named_read_async = 6 # Coro will get control immediately even if input queue is empty 56 named_read_sync = 7 # Coro will get control only if input queue is not empty or when new response will be 57 # received 58 59 class Request: 60 def __init__(self): 61 self.request_type = None # type: Optional[int] 62 self.args = None # type: Optional[Tuple] 63 self.kwargs = None # type: Optional[Dict] 64 65 def send_async(self, recipient_id: CoroID, message: Any) -> 'Communication.Request': 66 self._save(0, recipient_id, message) 67 return self 68 69 def send_blocking(self, recipient_id: CoroID, message: Any) -> 'Communication.Request': 70 self._save(1, recipient_id, message) 71 return self 72 73 def read_async(self) -> 'Communication.Request': 74 self._save(2) 75 return self 76 77 def read_blocking(self) -> 'Communication.Request': 78 self._save(3) 79 return self 80 81 def send_async_named( 82 self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> 'Communication.Request': 83 self._save(4, sender_id, recipient_id, message) 84 return self 85 86 def send_blocking_named( 87 self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> 'Communication.Request': 88 self._save(5, sender_id, recipient_id, message) 89 return self 90 91 def read_async_named(self, recipient_id: Hashable) -> 'Communication.Request': 92 self._save(6, recipient_id) 93 return self 94 95 def read_blocking_named(self, recipient_id: Hashable) -> 'Communication.Request': 96 self._save(7, recipient_id) 97 return self 98 99 def _save(self, __request__type__: int, *args, **kwargs): 100 self.request_type = __request__type__ 101 self.args = args 102 self.kwargs = kwargs 103 104 def __init__(self, loop: CoroSchedulerType): 105 super(Communication, self).__init__(loop) 106 107 def single_task_registration_or_immediate_processing( 108 self, request: 'Communication.Requests', *args, **kwargs 109 ) -> Tuple[bool, Any]: 110 if 0 == request.request_type: 111 self.request_send_async(*args, **kwargs) 112 return True, None, None 113 elif 1 == request.request_type: 114 self.request_send_sync(*args, **kwargs) 115 return False, None, None 116 elif 2 == request.request_type: 117 result = self.request_read_async() 118 return True, result, None 119 elif 3 == request.request_type: 120 self.request_read_sync() 121 return False, None, None 122 elif 4 == request.request_type: 123 self.request_named_send_async(*args, **kwargs) 124 return True, None, None 125 elif 5 == request.request_type: 126 self.request_named_send_sync(*args, **kwargs) 127 return False, None, None 128 elif 6 == request.request_type: 129 result = self.request_named_read_async(*args, **kwargs) 130 return True, result, None 131 elif 7 == request.request_type: 132 self.request_named_read_sync(*args, **kwargs) 133 return False, None, None 134 135 def request_send_async(self, recipient_id: CoroID, message: Any) -> NoReturn: 136 raise NotImplementedError 137 138 def request_send_sync(self, recipient_id: CoroID, message: Any) -> NoReturn: 139 raise NotImplementedError 140 141 def request_read_async(self) -> Tuple[CoroID, Any]: 142 """ 143 Will return tuple with sender coro ID and message 144 145 :rtype: Tuple[CoroID, Any] 146 """ 147 raise NotImplementedError 148 149 def request_read_sync(self) -> NoReturn: 150 raise NotImplementedError 151 152 def request_named_send_async(self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> NoReturn: 153 raise NotImplementedError 154 155 def request_named_send_sync(self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> NoReturn: 156 raise NotImplementedError 157 158 def request_named_read_async(self, recipient_id: Hashable) -> Tuple[Hashable, Any]: 159 """ 160 Will return tuple with sender ID and message 161 162 :rtype: Tuple[Hashable, Any] 163 """ 164 raise NotImplementedError 165 166 def request_named_read_sync(self, recipient_id: Hashable) -> NoReturn: 167 raise NotImplementedError 168 169 def full_processing_iteration(self): 170 pass 171 172 def in_work(self) -> bool: 173 return self.thrifty_in_work(False)
Communication( loop: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
def
single_task_registration_or_immediate_processing( self, request: Communication.Requests, *args, **kwargs) -> Tuple[bool, Any]:
107 def single_task_registration_or_immediate_processing( 108 self, request: 'Communication.Requests', *args, **kwargs 109 ) -> Tuple[bool, Any]: 110 if 0 == request.request_type: 111 self.request_send_async(*args, **kwargs) 112 return True, None, None 113 elif 1 == request.request_type: 114 self.request_send_sync(*args, **kwargs) 115 return False, None, None 116 elif 2 == request.request_type: 117 result = self.request_read_async() 118 return True, result, None 119 elif 3 == request.request_type: 120 self.request_read_sync() 121 return False, None, None 122 elif 4 == request.request_type: 123 self.request_named_send_async(*args, **kwargs) 124 return True, None, None 125 elif 5 == request.request_type: 126 self.request_named_send_sync(*args, **kwargs) 127 return False, None, None 128 elif 6 == request.request_type: 129 result = self.request_named_read_async(*args, **kwargs) 130 return True, result, None 131 elif 7 == request.request_type: 132 self.request_named_read_sync(*args, **kwargs) 133 return False, None, None
def
request_read_async(self) -> Tuple[int, Any]:
141 def request_read_async(self) -> Tuple[CoroID, Any]: 142 """ 143 Will return tuple with sender coro ID and message 144 145 :rtype: Tuple[CoroID, Any] 146 """ 147 raise NotImplementedError
Will return tuple with sender coro ID and message
:rtype: Tuple[CoroID, Any]
def
request_named_send_async( self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> NoReturn:
def
request_named_send_sync( self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> NoReturn:
def
request_named_read_async(self, recipient_id: Hashable) -> Tuple[Hashable, Any]:
158 def request_named_read_async(self, recipient_id: Hashable) -> Tuple[Hashable, Any]: 159 """ 160 Will return tuple with sender ID and message 161 162 :rtype: Tuple[Hashable, Any] 163 """ 164 raise NotImplementedError
Will return tuple with sender ID and message
:rtype: Tuple[Hashable, Any]
def
in_work(self) -> bool:
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- time_left_before_next_event
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
- destroy
class
Communication.Requests(enum.Enum):
47 class Requests(Enum): 48 send_async = 0 # Coro will get control immediately after message sent 49 send_sync = 1 # Coro will get control only after response will be received 50 read_async = 2 # Coro will get control immediately even if input queue is empty 51 read_sync = 3 # Coro will get control only if input queue is not empty or when new response will be 52 # received 53 named_send_async = 4 # Coro will get control immediately after message sent 54 named_send_sync = 5 # Coro will get control only after response will be received 55 named_read_async = 6 # Coro will get control immediately even if input queue is empty 56 named_read_sync = 7 # Coro will get control only if input queue is not empty or when new response will be 57 # received
An enumeration.
Inherited Members
- enum.Enum
- name
- value
class
Communication.Request:
59 class Request: 60 def __init__(self): 61 self.request_type = None # type: Optional[int] 62 self.args = None # type: Optional[Tuple] 63 self.kwargs = None # type: Optional[Dict] 64 65 def send_async(self, recipient_id: CoroID, message: Any) -> 'Communication.Request': 66 self._save(0, recipient_id, message) 67 return self 68 69 def send_blocking(self, recipient_id: CoroID, message: Any) -> 'Communication.Request': 70 self._save(1, recipient_id, message) 71 return self 72 73 def read_async(self) -> 'Communication.Request': 74 self._save(2) 75 return self 76 77 def read_blocking(self) -> 'Communication.Request': 78 self._save(3) 79 return self 80 81 def send_async_named( 82 self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> 'Communication.Request': 83 self._save(4, sender_id, recipient_id, message) 84 return self 85 86 def send_blocking_named( 87 self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> 'Communication.Request': 88 self._save(5, sender_id, recipient_id, message) 89 return self 90 91 def read_async_named(self, recipient_id: Hashable) -> 'Communication.Request': 92 self._save(6, recipient_id) 93 return self 94 95 def read_blocking_named(self, recipient_id: Hashable) -> 'Communication.Request': 96 self._save(7, recipient_id) 97 return self 98 99 def _save(self, __request__type__: int, *args, **kwargs): 100 self.request_type = __request__type__ 101 self.args = args 102 self.kwargs = kwargs
def
send_async_named( self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> Communication.Request:
def
send_blocking_named( self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> Communication.Request: