cengal.parallel_execution.coroutines.coro_standard_services.communication.versions.v_0.communication

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19"""
 20Module Docstring
 21Docstrings: http://www.python.org/dev/peps/pep-0257/
 22"""
 23
 24
 25__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 28__license__ = "Apache License, Version 2.0"
 29__version__ = "4.4.1"
 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 31__email__ = "gtalk@butenkoms.space"
 32# __status__ = "Prototype"
 33__status__ = "Development"
 34# __status__ = "Production"
 35
 36
 37__all__ = ['Communication']
 38
 39from cengal.parallel_execution.coroutines.coro_scheduler import *
 40from enum import Enum
 41from typing import Hashable, NoReturn, Dict, Tuple, Any
 42
 43
 44class Communication(Service):
 45
 46    class Requests(Enum):
 47        send_async = 0        # Coro will get control immediately after message sent
 48        send_sync = 1         # Coro will get control only after response will be received
 49        read_async = 2        # Coro will get control immediately even if input queue is empty
 50        read_sync = 3         # Coro will get control only if input queue is not empty or when new response will be
 51        # received
 52        named_send_async = 4  # Coro will get control immediately after message sent
 53        named_send_sync = 5   # Coro will get control only after response will be received
 54        named_read_async = 6  # Coro will get control immediately even if input queue is empty
 55        named_read_sync = 7   # Coro will get control only if input queue is not empty or when new response will be
 56        # received
 57
 58    class Request:
 59        def __init__(self):
 60            self.request_type = None  # type: Optional[int]
 61            self.args = None          # type: Optional[Tuple]
 62            self.kwargs = None        # type: Optional[Dict]
 63
 64        def send_async(self, recipient_id: CoroID, message: Any) -> 'Communication.Request':
 65            self._save(0, recipient_id, message)
 66            return self
 67
 68        def send_blocking(self, recipient_id: CoroID, message: Any) -> 'Communication.Request':
 69            self._save(1, recipient_id, message)
 70            return self
 71
 72        def read_async(self) -> 'Communication.Request':
 73            self._save(2)
 74            return self
 75
 76        def read_blocking(self) -> 'Communication.Request':
 77            self._save(3)
 78            return self
 79
 80        def send_async_named(
 81                self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> 'Communication.Request':
 82            self._save(4, sender_id, recipient_id, message)
 83            return self
 84
 85        def send_blocking_named(
 86                self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> 'Communication.Request':
 87            self._save(5, sender_id, recipient_id, message)
 88            return self
 89
 90        def read_async_named(self, recipient_id: Hashable) -> 'Communication.Request':
 91            self._save(6, recipient_id)
 92            return self
 93
 94        def read_blocking_named(self, recipient_id: Hashable) -> 'Communication.Request':
 95            self._save(7, recipient_id)
 96            return self
 97
 98        def _save(self, __request__type__: int, *args, **kwargs):
 99            self.request_type = __request__type__
100            self.args = args
101            self.kwargs = kwargs
102
103    def __init__(self, loop: CoroSchedulerType):
104        super(Communication, self).__init__(loop)
105
106    def single_task_registration_or_immediate_processing(
107            self, request: 'Communication.Requests', *args, **kwargs
108    ) -> Tuple[bool, Any]:
109        if 0 == request.request_type:
110            self.request_send_async(*args, **kwargs)
111            return True, None, None
112        elif 1 == request.request_type:
113            self.request_send_sync(*args, **kwargs)
114            return False, None, None
115        elif 2 == request.request_type:
116            result = self.request_read_async()
117            return True, result, None
118        elif 3 == request.request_type:
119            self.request_read_sync()
120            return False, None, None
121        elif 4 == request.request_type:
122            self.request_named_send_async(*args, **kwargs)
123            return True, None, None
124        elif 5 == request.request_type:
125            self.request_named_send_sync(*args, **kwargs)
126            return False, None, None
127        elif 6 == request.request_type:
128            result = self.request_named_read_async(*args, **kwargs)
129            return True, result, None
130        elif 7 == request.request_type:
131            self.request_named_read_sync(*args, **kwargs)
132            return False, None, None
133
134    def request_send_async(self, recipient_id: CoroID, message: Any) -> NoReturn:
135        raise NotImplementedError
136
137    def request_send_sync(self, recipient_id: CoroID, message: Any) -> NoReturn:
138        raise NotImplementedError
139
140    def request_read_async(self) -> Tuple[CoroID, Any]:
141        """
142        Will return tuple with sender coro ID and message
143
144        :rtype: Tuple[CoroID, Any]
145        """
146        raise NotImplementedError
147
148    def request_read_sync(self) -> NoReturn:
149        raise NotImplementedError
150
151    def request_named_send_async(self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> NoReturn:
152        raise NotImplementedError
153
154    def request_named_send_sync(self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> NoReturn:
155        raise NotImplementedError
156
157    def request_named_read_async(self, recipient_id: Hashable) -> Tuple[Hashable, Any]:
158        """
159        Will return tuple with sender ID and message
160
161        :rtype: Tuple[Hashable, Any]
162        """
163        raise NotImplementedError
164
165    def request_named_read_sync(self, recipient_id: Hashable) -> NoReturn:
166        raise NotImplementedError
167
168    def full_processing_iteration(self):
169        pass
170
171    def in_work(self) -> bool:
172        return self.thrifty_in_work(False)
class Communication(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service):
 45class Communication(Service):
 46
 47    class Requests(Enum):
 48        send_async = 0        # Coro will get control immediately after message sent
 49        send_sync = 1         # Coro will get control only after response will be received
 50        read_async = 2        # Coro will get control immediately even if input queue is empty
 51        read_sync = 3         # Coro will get control only if input queue is not empty or when new response will be
 52        # received
 53        named_send_async = 4  # Coro will get control immediately after message sent
 54        named_send_sync = 5   # Coro will get control only after response will be received
 55        named_read_async = 6  # Coro will get control immediately even if input queue is empty
 56        named_read_sync = 7   # Coro will get control only if input queue is not empty or when new response will be
 57        # received
 58
 59    class Request:
 60        def __init__(self):
 61            self.request_type = None  # type: Optional[int]
 62            self.args = None          # type: Optional[Tuple]
 63            self.kwargs = None        # type: Optional[Dict]
 64
 65        def send_async(self, recipient_id: CoroID, message: Any) -> 'Communication.Request':
 66            self._save(0, recipient_id, message)
 67            return self
 68
 69        def send_blocking(self, recipient_id: CoroID, message: Any) -> 'Communication.Request':
 70            self._save(1, recipient_id, message)
 71            return self
 72
 73        def read_async(self) -> 'Communication.Request':
 74            self._save(2)
 75            return self
 76
 77        def read_blocking(self) -> 'Communication.Request':
 78            self._save(3)
 79            return self
 80
 81        def send_async_named(
 82                self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> 'Communication.Request':
 83            self._save(4, sender_id, recipient_id, message)
 84            return self
 85
 86        def send_blocking_named(
 87                self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> 'Communication.Request':
 88            self._save(5, sender_id, recipient_id, message)
 89            return self
 90
 91        def read_async_named(self, recipient_id: Hashable) -> 'Communication.Request':
 92            self._save(6, recipient_id)
 93            return self
 94
 95        def read_blocking_named(self, recipient_id: Hashable) -> 'Communication.Request':
 96            self._save(7, recipient_id)
 97            return self
 98
 99        def _save(self, __request__type__: int, *args, **kwargs):
100            self.request_type = __request__type__
101            self.args = args
102            self.kwargs = kwargs
103
104    def __init__(self, loop: CoroSchedulerType):
105        super(Communication, self).__init__(loop)
106
107    def single_task_registration_or_immediate_processing(
108            self, request: 'Communication.Requests', *args, **kwargs
109    ) -> Tuple[bool, Any]:
110        if 0 == request.request_type:
111            self.request_send_async(*args, **kwargs)
112            return True, None, None
113        elif 1 == request.request_type:
114            self.request_send_sync(*args, **kwargs)
115            return False, None, None
116        elif 2 == request.request_type:
117            result = self.request_read_async()
118            return True, result, None
119        elif 3 == request.request_type:
120            self.request_read_sync()
121            return False, None, None
122        elif 4 == request.request_type:
123            self.request_named_send_async(*args, **kwargs)
124            return True, None, None
125        elif 5 == request.request_type:
126            self.request_named_send_sync(*args, **kwargs)
127            return False, None, None
128        elif 6 == request.request_type:
129            result = self.request_named_read_async(*args, **kwargs)
130            return True, result, None
131        elif 7 == request.request_type:
132            self.request_named_read_sync(*args, **kwargs)
133            return False, None, None
134
135    def request_send_async(self, recipient_id: CoroID, message: Any) -> NoReturn:
136        raise NotImplementedError
137
138    def request_send_sync(self, recipient_id: CoroID, message: Any) -> NoReturn:
139        raise NotImplementedError
140
141    def request_read_async(self) -> Tuple[CoroID, Any]:
142        """
143        Will return tuple with sender coro ID and message
144
145        :rtype: Tuple[CoroID, Any]
146        """
147        raise NotImplementedError
148
149    def request_read_sync(self) -> NoReturn:
150        raise NotImplementedError
151
152    def request_named_send_async(self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> NoReturn:
153        raise NotImplementedError
154
155    def request_named_send_sync(self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> NoReturn:
156        raise NotImplementedError
157
158    def request_named_read_async(self, recipient_id: Hashable) -> Tuple[Hashable, Any]:
159        """
160        Will return tuple with sender ID and message
161
162        :rtype: Tuple[Hashable, Any]
163        """
164        raise NotImplementedError
165
166    def request_named_read_sync(self, recipient_id: Hashable) -> NoReturn:
167        raise NotImplementedError
168
169    def full_processing_iteration(self):
170        pass
171
172    def in_work(self) -> bool:
173        return self.thrifty_in_work(False)
Communication( loop: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
104    def __init__(self, loop: CoroSchedulerType):
105        super(Communication, self).__init__(loop)
def single_task_registration_or_immediate_processing( self, request: Communication.Requests, *args, **kwargs) -> Tuple[bool, Any]:
107    def single_task_registration_or_immediate_processing(
108            self, request: 'Communication.Requests', *args, **kwargs
109    ) -> Tuple[bool, Any]:
110        if 0 == request.request_type:
111            self.request_send_async(*args, **kwargs)
112            return True, None, None
113        elif 1 == request.request_type:
114            self.request_send_sync(*args, **kwargs)
115            return False, None, None
116        elif 2 == request.request_type:
117            result = self.request_read_async()
118            return True, result, None
119        elif 3 == request.request_type:
120            self.request_read_sync()
121            return False, None, None
122        elif 4 == request.request_type:
123            self.request_named_send_async(*args, **kwargs)
124            return True, None, None
125        elif 5 == request.request_type:
126            self.request_named_send_sync(*args, **kwargs)
127            return False, None, None
128        elif 6 == request.request_type:
129            result = self.request_named_read_async(*args, **kwargs)
130            return True, result, None
131        elif 7 == request.request_type:
132            self.request_named_read_sync(*args, **kwargs)
133            return False, None, None
def request_send_async(self, recipient_id: int, message: Any) -> NoReturn:
135    def request_send_async(self, recipient_id: CoroID, message: Any) -> NoReturn:
136        raise NotImplementedError
def request_send_sync(self, recipient_id: int, message: Any) -> NoReturn:
138    def request_send_sync(self, recipient_id: CoroID, message: Any) -> NoReturn:
139        raise NotImplementedError
def request_read_async(self) -> Tuple[int, Any]:
141    def request_read_async(self) -> Tuple[CoroID, Any]:
142        """
143        Will return tuple with sender coro ID and message
144
145        :rtype: Tuple[CoroID, Any]
146        """
147        raise NotImplementedError

Will return tuple with sender coro ID and message

:rtype: Tuple[CoroID, Any]

def request_read_sync(self) -> NoReturn:
149    def request_read_sync(self) -> NoReturn:
150        raise NotImplementedError
def request_named_send_async( self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> NoReturn:
152    def request_named_send_async(self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> NoReturn:
153        raise NotImplementedError
def request_named_send_sync( self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> NoReturn:
155    def request_named_send_sync(self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> NoReturn:
156        raise NotImplementedError
def request_named_read_async(self, recipient_id: Hashable) -> Tuple[Hashable, Any]:
158    def request_named_read_async(self, recipient_id: Hashable) -> Tuple[Hashable, Any]:
159        """
160        Will return tuple with sender ID and message
161
162        :rtype: Tuple[Hashable, Any]
163        """
164        raise NotImplementedError

Will return tuple with sender ID and message

:rtype: Tuple[Hashable, Any]

def request_named_read_sync(self, recipient_id: Hashable) -> NoReturn:
166    def request_named_read_sync(self, recipient_id: Hashable) -> NoReturn:
167        raise NotImplementedError
def full_processing_iteration(self):
169    def full_processing_iteration(self):
170        pass
def in_work(self) -> bool:
172    def in_work(self) -> bool:
173        return self.thrifty_in_work(False)

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
in_forground_work
thrifty_in_work
time_left_before_next_event
is_low_latency
make_live
make_dead
service_id_impl
service_id
destroy
class Communication.Requests(enum.Enum):
47    class Requests(Enum):
48        send_async = 0        # Coro will get control immediately after message sent
49        send_sync = 1         # Coro will get control only after response will be received
50        read_async = 2        # Coro will get control immediately even if input queue is empty
51        read_sync = 3         # Coro will get control only if input queue is not empty or when new response will be
52        # received
53        named_send_async = 4  # Coro will get control immediately after message sent
54        named_send_sync = 5   # Coro will get control only after response will be received
55        named_read_async = 6  # Coro will get control immediately even if input queue is empty
56        named_read_sync = 7   # Coro will get control only if input queue is not empty or when new response will be
57        # received

An enumeration.

send_async = <Requests.send_async: 0>
send_sync = <Requests.send_sync: 1>
read_async = <Requests.read_async: 2>
read_sync = <Requests.read_sync: 3>
named_send_async = <Requests.named_send_async: 4>
named_send_sync = <Requests.named_send_sync: 5>
named_read_async = <Requests.named_read_async: 6>
named_read_sync = <Requests.named_read_sync: 7>
Inherited Members
enum.Enum
name
value
class Communication.Request:
 59    class Request:
 60        def __init__(self):
 61            self.request_type = None  # type: Optional[int]
 62            self.args = None          # type: Optional[Tuple]
 63            self.kwargs = None        # type: Optional[Dict]
 64
 65        def send_async(self, recipient_id: CoroID, message: Any) -> 'Communication.Request':
 66            self._save(0, recipient_id, message)
 67            return self
 68
 69        def send_blocking(self, recipient_id: CoroID, message: Any) -> 'Communication.Request':
 70            self._save(1, recipient_id, message)
 71            return self
 72
 73        def read_async(self) -> 'Communication.Request':
 74            self._save(2)
 75            return self
 76
 77        def read_blocking(self) -> 'Communication.Request':
 78            self._save(3)
 79            return self
 80
 81        def send_async_named(
 82                self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> 'Communication.Request':
 83            self._save(4, sender_id, recipient_id, message)
 84            return self
 85
 86        def send_blocking_named(
 87                self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> 'Communication.Request':
 88            self._save(5, sender_id, recipient_id, message)
 89            return self
 90
 91        def read_async_named(self, recipient_id: Hashable) -> 'Communication.Request':
 92            self._save(6, recipient_id)
 93            return self
 94
 95        def read_blocking_named(self, recipient_id: Hashable) -> 'Communication.Request':
 96            self._save(7, recipient_id)
 97            return self
 98
 99        def _save(self, __request__type__: int, *args, **kwargs):
100            self.request_type = __request__type__
101            self.args = args
102            self.kwargs = kwargs
request_type
args
kwargs
def send_async( self, recipient_id: int, message: Any) -> Communication.Request:
65        def send_async(self, recipient_id: CoroID, message: Any) -> 'Communication.Request':
66            self._save(0, recipient_id, message)
67            return self
def send_blocking( self, recipient_id: int, message: Any) -> Communication.Request:
69        def send_blocking(self, recipient_id: CoroID, message: Any) -> 'Communication.Request':
70            self._save(1, recipient_id, message)
71            return self
def read_async( self) -> Communication.Request:
73        def read_async(self) -> 'Communication.Request':
74            self._save(2)
75            return self
def read_blocking( self) -> Communication.Request:
77        def read_blocking(self) -> 'Communication.Request':
78            self._save(3)
79            return self
def send_async_named( self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> Communication.Request:
81        def send_async_named(
82                self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> 'Communication.Request':
83            self._save(4, sender_id, recipient_id, message)
84            return self
def send_blocking_named( self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> Communication.Request:
86        def send_blocking_named(
87                self, sender_id: Hashable, recipient_id: Hashable, message: Any) -> 'Communication.Request':
88            self._save(5, sender_id, recipient_id, message)
89            return self
def read_async_named( self, recipient_id: Hashable) -> Communication.Request:
91        def read_async_named(self, recipient_id: Hashable) -> 'Communication.Request':
92            self._save(6, recipient_id)
93            return self
def read_blocking_named( self, recipient_id: Hashable) -> Communication.Request:
95        def read_blocking_named(self, recipient_id: Hashable) -> 'Communication.Request':
96            self._save(7, recipient_id)
97            return self