cengal.io.named_connections.named_connections_manager.versions.v_0.named_connections_manager
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18""" 19Module Docstring 20Docstrings: http://www.python.org/dev/peps/pep-0257/ 21""" 22 23__author__ = "ButenkoMS <gtalk@butenkoms.space>" 24__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 25__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 26__license__ = "Apache License, Version 2.0" 27__version__ = "4.4.1" 28__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 29__email__ = "gtalk@butenkoms.space" 30# __status__ = "Prototype" 31__status__ = "Development" 32# __status__ = "Production" 33 34 35from cengal.data_generation.id_generator import IDGenerator 36from cengal.data_containers.fast_fifo import FIFODequeWithLengthControl 37from cengal.code_flow_control.smart_values import ValueCache, ValueExistence 38from weakref import ref, ReferenceType 39from typing import Optional, Set, Dict, List, Tuple, Callable, Awaitable, Sequence, Type 40 41 42# class NamedConnectionsTransport: 43# def disconnect(self): 44# raise NotImplementedError 45 46# def disconnected(self): 47# ... 48 49# def eof(self) -> bool: 50# ... 51 52 53class NamedConnectionsClient: 54 def __init__(self, server_id: bytes, external_data_full_size: ValueExistence) -> None: 55 self.id = None 56 self.server_id: bytes = server_id 57 self.server_ref: Optional[ReferenceType['NamedConnectionsServer']] = None 58 self.input_from_client: FIFODequeWithLengthControl = FIFODequeWithLengthControl(external_data_full_size) 59 self.output_to_client: FIFODequeWithLengthControl = FIFODequeWithLengthControl(external_data_full_size) 60 61 def callback__bind(self, server: 'NamedConnectionsServer'): 62 """Server can emit this callback 63 64 Args: 65 server (NamedConnectionsServer): _description_ 66 """ 67 self.server_ref = ref(server) 68 69 def callback__unbind(self): 70 """Server can emit this callback 71 """ 72 self.server_ref = None 73 74 def callback__data_to_client_added(self): 75 """Server can emit this callback 76 77 Raises: 78 NotImplementedError: _description_ 79 """ 80 raise NotImplementedError 81 82 def callback__server_ready_for_data(self): 83 """Server can emit this callback 84 85 Raises: 86 NotImplementedError: _description_ 87 """ 88 raise NotImplementedError 89 90 def callback__is_client_ready_for_data(self) -> bool: 91 """Server can emit this callback 92 93 Raises: 94 NotImplementedError: _description_ 95 96 Returns: 97 bool: _description_ 98 """ 99 raise NotImplementedError 100 101 def callback__stop(self): 102 """Server can emit this callback. No additional data from the server will be provided. No read from the server side will be made. 103 104 Raises: 105 NotImplementedError: _description_ 106 """ 107 raise NotImplementedError 108 109 def data_to_server_added(self): 110 """Client can call this method 111 """ 112 server = self.server_ref() 113 if server: 114 server.callback__data_to_server_added(self) 115 116 def is_server_ready_for_data(self) -> bool: 117 """Client can call this method 118 119 Returns: 120 bool: _description_ 121 """ 122 server = self.server_ref() 123 if server: 124 return server.callback__is_server_ready_for_data(self) 125 else: 126 return False 127 128 def client_ready_for_data(self): 129 """Client can call this method 130 """ 131 server = self.server_ref() 132 if server: 133 server.callback__client_ready_for_data(self) 134 135 def stop(self): 136 """Client can call this method. Server should not call it. Server should call NamedConnectionsClient.callback__stop() when wants to close connection 137 """ 138 server = self.server_ref() 139 if server: 140 server.callback__client_stopped(self) 141 142 143 144class NamedConnectionsServer: 145 # def __init__(self, server_id: bytes, named_connections_manager: Optional['NamedConnectionsManager'] = None) -> None: 146 # self.id: bytes = server_id 147 # if named_connections_manager: 148 # self.named_connections_manager: ReferenceType['NamedConnectionsManager'] = ref(named_connections_manager) 149 # else: 150 # self.named_connections_manager = None 151 152 # self.bind_clients: Dict[int, NamedConnectionsClient] = dict() 153 154 def __init__(self, server_id: bytes, named_connections_manager: 'NamedConnectionsManager') -> None: 155 self.id: bytes = server_id 156 self.named_connections_manager: ReferenceType['NamedConnectionsManager'] = ref(named_connections_manager) 157 self.bind_clients: Dict[int, NamedConnectionsClient] = dict() 158 159 def bind(self, client: NamedConnectionsClient): 160 self.bind_clients[client.id] = client 161 client.callback__bind(self) 162 163 def unbind(self, client: NamedConnectionsClient): 164 try: 165 del self.bind_clients[client.id] 166 except KeyError: 167 pass 168 finally: 169 client.callback__unbind() 170 171 def callback__data_to_server_added(self, client: NamedConnectionsClient): 172 raise NotImplementedError 173 174 def callback__is_server_ready_for_data(self, client: NamedConnectionsClient) -> bool: 175 raise NotImplementedError 176 177 def callback__client_ready_for_data(self, client: NamedConnectionsClient): 178 raise NotImplementedError 179 180 def callback__client_stopped(self, client: NamedConnectionsClient): 181 """Client can emit this callback. No additional data from the client will be provided. No read from the client side will be made. 182 183 Raises: 184 NotImplementedError: _description_ 185 """ 186 raise NotImplementedError 187 188 189class NamedConnectionsManager: 190 def __init__(self, external_data_full_size: ValueExistence) -> None: 191 self.external_data_full_size: ValueExistence = external_data_full_size 192 self.gen_client_id: IDGenerator = IDGenerator() 193 self.unbind_clients: Dict[int, NamedConnectionsClient] = dict() 194 self.unbind_clients_per_server: Dict[str, Set[NamedConnectionsClient]] = dict() 195 self.bind_clients: Dict[int, NamedConnectionsClient] = dict() 196 self.servers: Dict[bytes, NamedConnectionsServer] = dict() 197 198 def create_client(self, client_type: Type[NamedConnectionsClient], server_id: bytes) -> NamedConnectionsClient: 199 client: NamedConnectionsClient = client_type(server_id, self.external_data_full_size) 200 client_id = self.gen_client_id() 201 client.id = client_id 202 if server_id in self.servers: 203 self.bind_clients[client_id] = client 204 self.servers[server_id].bind(client) 205 else: 206 self._unbind_client_impl(client) 207 208 return client_id 209 210 def register_client(self, client: NamedConnectionsClient) -> int: 211 server_id: bytes = client.server_id 212 client_id = self.gen_client_id() 213 client.id = client_id 214 if server_id in self.servers: 215 self.bind_clients[client_id] = client 216 self.servers[server_id].bind(client) 217 else: 218 self.unbind_clients[client_id] = client 219 if server_id not in self.unbind_clients_per_server: 220 self.unbind_clients_per_server[server_id] = set() 221 222 self.unbind_clients_per_server[server_id].add(client) 223 224 return client_id 225 226 def register_server(self, server: NamedConnectionsServer): 227 server_id: bytes = server.id 228 if server_id in self.servers: 229 raise RuntimeError('NamedConnectionsServer already registered') 230 231 self.servers[server_id] = server 232 if server_id in self.unbind_clients_per_server: 233 unbind_clients = self.unbind_clients_per_server[server_id] 234 del self.unbind_clients_per_server[server_id] 235 for client in unbind_clients: 236 del self.unbind_clients[client.id] 237 self.bind_clients[client.id] = client 238 server.bind(client) 239 240 def unregister_server(self, server: NamedConnectionsServer): 241 server_clients = server.bind_clients.items() 242 for client_id, client in server_clients: 243 self.unbind_client(client) 244 245 try: 246 del self.servers[server.id] 247 except KeyError: 248 pass 249 250 def unregister_client(self, client: NamedConnectionsClient): 251 server = client.server_ref() 252 if server: 253 server.unbind(client) 254 255 client_id = client.id 256 self.bind_clients.pop(client_id, None) 257 self.unbind_clients.pop(client_id, None) 258 try: 259 self.unbind_clients_per_server[client.server_id].discard(client) 260 except KeyError: 261 pass 262 263 def unbind_client(self, client: NamedConnectionsClient): 264 server = client.server_ref() 265 if server: 266 server.unbind(client) 267 268 self._unbind_client_impl(client) 269 270 def _unbind_client_impl(self, client: NamedConnectionsClient): 271 client_id = client.id 272 server_id = client.server_id 273 self.bind_clients.pop(client_id, None) 274 self.unbind_clients[client_id] = client 275 if server_id not in self.unbind_clients_per_server: 276 self.unbind_clients_per_server[server_id] = set() 277 278 self.unbind_clients_per_server[server_id].add(client) 279 280 def rebind_client(self, client: NamedConnectionsClient, server_id: bytes): 281 """Force rebind client to another server 282 """ 283 self.servers[client.server_id].unbind(client) 284 client.server_id = server_id 285 self.servers[server_id].bind(client) 286 287 # def put_client_message_to_server_queue(self, message): 288 # ... 289 290 # def put_server_message_to_client_queue(self, message): 291 # ... 292 293 # def client_to_server_queue_len(self): 294 # ... 295 296 # def server_to_client_queue_len(self): 297 # ...
54class NamedConnectionsClient: 55 def __init__(self, server_id: bytes, external_data_full_size: ValueExistence) -> None: 56 self.id = None 57 self.server_id: bytes = server_id 58 self.server_ref: Optional[ReferenceType['NamedConnectionsServer']] = None 59 self.input_from_client: FIFODequeWithLengthControl = FIFODequeWithLengthControl(external_data_full_size) 60 self.output_to_client: FIFODequeWithLengthControl = FIFODequeWithLengthControl(external_data_full_size) 61 62 def callback__bind(self, server: 'NamedConnectionsServer'): 63 """Server can emit this callback 64 65 Args: 66 server (NamedConnectionsServer): _description_ 67 """ 68 self.server_ref = ref(server) 69 70 def callback__unbind(self): 71 """Server can emit this callback 72 """ 73 self.server_ref = None 74 75 def callback__data_to_client_added(self): 76 """Server can emit this callback 77 78 Raises: 79 NotImplementedError: _description_ 80 """ 81 raise NotImplementedError 82 83 def callback__server_ready_for_data(self): 84 """Server can emit this callback 85 86 Raises: 87 NotImplementedError: _description_ 88 """ 89 raise NotImplementedError 90 91 def callback__is_client_ready_for_data(self) -> bool: 92 """Server can emit this callback 93 94 Raises: 95 NotImplementedError: _description_ 96 97 Returns: 98 bool: _description_ 99 """ 100 raise NotImplementedError 101 102 def callback__stop(self): 103 """Server can emit this callback. No additional data from the server will be provided. No read from the server side will be made. 104 105 Raises: 106 NotImplementedError: _description_ 107 """ 108 raise NotImplementedError 109 110 def data_to_server_added(self): 111 """Client can call this method 112 """ 113 server = self.server_ref() 114 if server: 115 server.callback__data_to_server_added(self) 116 117 def is_server_ready_for_data(self) -> bool: 118 """Client can call this method 119 120 Returns: 121 bool: _description_ 122 """ 123 server = self.server_ref() 124 if server: 125 return server.callback__is_server_ready_for_data(self) 126 else: 127 return False 128 129 def client_ready_for_data(self): 130 """Client can call this method 131 """ 132 server = self.server_ref() 133 if server: 134 server.callback__client_ready_for_data(self) 135 136 def stop(self): 137 """Client can call this method. Server should not call it. Server should call NamedConnectionsClient.callback__stop() when wants to close connection 138 """ 139 server = self.server_ref() 140 if server: 141 server.callback__client_stopped(self)
55 def __init__(self, server_id: bytes, external_data_full_size: ValueExistence) -> None: 56 self.id = None 57 self.server_id: bytes = server_id 58 self.server_ref: Optional[ReferenceType['NamedConnectionsServer']] = None 59 self.input_from_client: FIFODequeWithLengthControl = FIFODequeWithLengthControl(external_data_full_size) 60 self.output_to_client: FIFODequeWithLengthControl = FIFODequeWithLengthControl(external_data_full_size)
62 def callback__bind(self, server: 'NamedConnectionsServer'): 63 """Server can emit this callback 64 65 Args: 66 server (NamedConnectionsServer): _description_ 67 """ 68 self.server_ref = ref(server)
Server can emit this callback
Args: server (NamedConnectionsServer): _description_
75 def callback__data_to_client_added(self): 76 """Server can emit this callback 77 78 Raises: 79 NotImplementedError: _description_ 80 """ 81 raise NotImplementedError
Server can emit this callback
Raises: NotImplementedError: _description_
83 def callback__server_ready_for_data(self): 84 """Server can emit this callback 85 86 Raises: 87 NotImplementedError: _description_ 88 """ 89 raise NotImplementedError
Server can emit this callback
Raises: NotImplementedError: _description_
91 def callback__is_client_ready_for_data(self) -> bool: 92 """Server can emit this callback 93 94 Raises: 95 NotImplementedError: _description_ 96 97 Returns: 98 bool: _description_ 99 """ 100 raise NotImplementedError
Server can emit this callback
Raises: NotImplementedError: _description_
Returns: bool: _description_
102 def callback__stop(self): 103 """Server can emit this callback. No additional data from the server will be provided. No read from the server side will be made. 104 105 Raises: 106 NotImplementedError: _description_ 107 """ 108 raise NotImplementedError
Server can emit this callback. No additional data from the server will be provided. No read from the server side will be made.
Raises: NotImplementedError: _description_
110 def data_to_server_added(self): 111 """Client can call this method 112 """ 113 server = self.server_ref() 114 if server: 115 server.callback__data_to_server_added(self)
Client can call this method
117 def is_server_ready_for_data(self) -> bool: 118 """Client can call this method 119 120 Returns: 121 bool: _description_ 122 """ 123 server = self.server_ref() 124 if server: 125 return server.callback__is_server_ready_for_data(self) 126 else: 127 return False
Client can call this method
Returns: bool: _description_
129 def client_ready_for_data(self): 130 """Client can call this method 131 """ 132 server = self.server_ref() 133 if server: 134 server.callback__client_ready_for_data(self)
Client can call this method
136 def stop(self): 137 """Client can call this method. Server should not call it. Server should call NamedConnectionsClient.callback__stop() when wants to close connection 138 """ 139 server = self.server_ref() 140 if server: 141 server.callback__client_stopped(self)
Client can call this method. Server should not call it. Server should call NamedConnectionsClient.callback__stop() when wants to close connection
145class NamedConnectionsServer: 146 # def __init__(self, server_id: bytes, named_connections_manager: Optional['NamedConnectionsManager'] = None) -> None: 147 # self.id: bytes = server_id 148 # if named_connections_manager: 149 # self.named_connections_manager: ReferenceType['NamedConnectionsManager'] = ref(named_connections_manager) 150 # else: 151 # self.named_connections_manager = None 152 153 # self.bind_clients: Dict[int, NamedConnectionsClient] = dict() 154 155 def __init__(self, server_id: bytes, named_connections_manager: 'NamedConnectionsManager') -> None: 156 self.id: bytes = server_id 157 self.named_connections_manager: ReferenceType['NamedConnectionsManager'] = ref(named_connections_manager) 158 self.bind_clients: Dict[int, NamedConnectionsClient] = dict() 159 160 def bind(self, client: NamedConnectionsClient): 161 self.bind_clients[client.id] = client 162 client.callback__bind(self) 163 164 def unbind(self, client: NamedConnectionsClient): 165 try: 166 del self.bind_clients[client.id] 167 except KeyError: 168 pass 169 finally: 170 client.callback__unbind() 171 172 def callback__data_to_server_added(self, client: NamedConnectionsClient): 173 raise NotImplementedError 174 175 def callback__is_server_ready_for_data(self, client: NamedConnectionsClient) -> bool: 176 raise NotImplementedError 177 178 def callback__client_ready_for_data(self, client: NamedConnectionsClient): 179 raise NotImplementedError 180 181 def callback__client_stopped(self, client: NamedConnectionsClient): 182 """Client can emit this callback. No additional data from the client will be provided. No read from the client side will be made. 183 184 Raises: 185 NotImplementedError: _description_ 186 """ 187 raise NotImplementedError
155 def __init__(self, server_id: bytes, named_connections_manager: 'NamedConnectionsManager') -> None: 156 self.id: bytes = server_id 157 self.named_connections_manager: ReferenceType['NamedConnectionsManager'] = ref(named_connections_manager) 158 self.bind_clients: Dict[int, NamedConnectionsClient] = dict()
181 def callback__client_stopped(self, client: NamedConnectionsClient): 182 """Client can emit this callback. No additional data from the client will be provided. No read from the client side will be made. 183 184 Raises: 185 NotImplementedError: _description_ 186 """ 187 raise NotImplementedError
Client can emit this callback. No additional data from the client will be provided. No read from the client side will be made.
Raises: NotImplementedError: _description_
190class NamedConnectionsManager: 191 def __init__(self, external_data_full_size: ValueExistence) -> None: 192 self.external_data_full_size: ValueExistence = external_data_full_size 193 self.gen_client_id: IDGenerator = IDGenerator() 194 self.unbind_clients: Dict[int, NamedConnectionsClient] = dict() 195 self.unbind_clients_per_server: Dict[str, Set[NamedConnectionsClient]] = dict() 196 self.bind_clients: Dict[int, NamedConnectionsClient] = dict() 197 self.servers: Dict[bytes, NamedConnectionsServer] = dict() 198 199 def create_client(self, client_type: Type[NamedConnectionsClient], server_id: bytes) -> NamedConnectionsClient: 200 client: NamedConnectionsClient = client_type(server_id, self.external_data_full_size) 201 client_id = self.gen_client_id() 202 client.id = client_id 203 if server_id in self.servers: 204 self.bind_clients[client_id] = client 205 self.servers[server_id].bind(client) 206 else: 207 self._unbind_client_impl(client) 208 209 return client_id 210 211 def register_client(self, client: NamedConnectionsClient) -> int: 212 server_id: bytes = client.server_id 213 client_id = self.gen_client_id() 214 client.id = client_id 215 if server_id in self.servers: 216 self.bind_clients[client_id] = client 217 self.servers[server_id].bind(client) 218 else: 219 self.unbind_clients[client_id] = client 220 if server_id not in self.unbind_clients_per_server: 221 self.unbind_clients_per_server[server_id] = set() 222 223 self.unbind_clients_per_server[server_id].add(client) 224 225 return client_id 226 227 def register_server(self, server: NamedConnectionsServer): 228 server_id: bytes = server.id 229 if server_id in self.servers: 230 raise RuntimeError('NamedConnectionsServer already registered') 231 232 self.servers[server_id] = server 233 if server_id in self.unbind_clients_per_server: 234 unbind_clients = self.unbind_clients_per_server[server_id] 235 del self.unbind_clients_per_server[server_id] 236 for client in unbind_clients: 237 del self.unbind_clients[client.id] 238 self.bind_clients[client.id] = client 239 server.bind(client) 240 241 def unregister_server(self, server: NamedConnectionsServer): 242 server_clients = server.bind_clients.items() 243 for client_id, client in server_clients: 244 self.unbind_client(client) 245 246 try: 247 del self.servers[server.id] 248 except KeyError: 249 pass 250 251 def unregister_client(self, client: NamedConnectionsClient): 252 server = client.server_ref() 253 if server: 254 server.unbind(client) 255 256 client_id = client.id 257 self.bind_clients.pop(client_id, None) 258 self.unbind_clients.pop(client_id, None) 259 try: 260 self.unbind_clients_per_server[client.server_id].discard(client) 261 except KeyError: 262 pass 263 264 def unbind_client(self, client: NamedConnectionsClient): 265 server = client.server_ref() 266 if server: 267 server.unbind(client) 268 269 self._unbind_client_impl(client) 270 271 def _unbind_client_impl(self, client: NamedConnectionsClient): 272 client_id = client.id 273 server_id = client.server_id 274 self.bind_clients.pop(client_id, None) 275 self.unbind_clients[client_id] = client 276 if server_id not in self.unbind_clients_per_server: 277 self.unbind_clients_per_server[server_id] = set() 278 279 self.unbind_clients_per_server[server_id].add(client) 280 281 def rebind_client(self, client: NamedConnectionsClient, server_id: bytes): 282 """Force rebind client to another server 283 """ 284 self.servers[client.server_id].unbind(client) 285 client.server_id = server_id 286 self.servers[server_id].bind(client) 287 288 # def put_client_message_to_server_queue(self, message): 289 # ... 290 291 # def put_server_message_to_client_queue(self, message): 292 # ... 293 294 # def client_to_server_queue_len(self): 295 # ... 296 297 # def server_to_client_queue_len(self): 298 # ...
191 def __init__(self, external_data_full_size: ValueExistence) -> None: 192 self.external_data_full_size: ValueExistence = external_data_full_size 193 self.gen_client_id: IDGenerator = IDGenerator() 194 self.unbind_clients: Dict[int, NamedConnectionsClient] = dict() 195 self.unbind_clients_per_server: Dict[str, Set[NamedConnectionsClient]] = dict() 196 self.bind_clients: Dict[int, NamedConnectionsClient] = dict() 197 self.servers: Dict[bytes, NamedConnectionsServer] = dict()
199 def create_client(self, client_type: Type[NamedConnectionsClient], server_id: bytes) -> NamedConnectionsClient: 200 client: NamedConnectionsClient = client_type(server_id, self.external_data_full_size) 201 client_id = self.gen_client_id() 202 client.id = client_id 203 if server_id in self.servers: 204 self.bind_clients[client_id] = client 205 self.servers[server_id].bind(client) 206 else: 207 self._unbind_client_impl(client) 208 209 return client_id
211 def register_client(self, client: NamedConnectionsClient) -> int: 212 server_id: bytes = client.server_id 213 client_id = self.gen_client_id() 214 client.id = client_id 215 if server_id in self.servers: 216 self.bind_clients[client_id] = client 217 self.servers[server_id].bind(client) 218 else: 219 self.unbind_clients[client_id] = client 220 if server_id not in self.unbind_clients_per_server: 221 self.unbind_clients_per_server[server_id] = set() 222 223 self.unbind_clients_per_server[server_id].add(client) 224 225 return client_id
227 def register_server(self, server: NamedConnectionsServer): 228 server_id: bytes = server.id 229 if server_id in self.servers: 230 raise RuntimeError('NamedConnectionsServer already registered') 231 232 self.servers[server_id] = server 233 if server_id in self.unbind_clients_per_server: 234 unbind_clients = self.unbind_clients_per_server[server_id] 235 del self.unbind_clients_per_server[server_id] 236 for client in unbind_clients: 237 del self.unbind_clients[client.id] 238 self.bind_clients[client.id] = client 239 server.bind(client)
251 def unregister_client(self, client: NamedConnectionsClient): 252 server = client.server_ref() 253 if server: 254 server.unbind(client) 255 256 client_id = client.id 257 self.bind_clients.pop(client_id, None) 258 self.unbind_clients.pop(client_id, None) 259 try: 260 self.unbind_clients_per_server[client.server_id].discard(client) 261 except KeyError: 262 pass