cengal.parallel_execution.coroutines.coro_standard_services.remote_nodes.versions.v_0.remote_nodes
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19__all__ = ['RemoteNodes', 'RemoteNodesRequest'] 20 21from enum import Enum 22from cengal.parallel_execution.coroutines.coro_scheduler import * 23from cengal.parallel_execution.coroutines.coro_tools.await_coro import * 24from cengal.parallel_execution.coroutines.coro_standard_services.asyncio_loop import * 25from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import CoroPriority 26from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import * 27from cengal.parallel_execution.coroutines.coro_standard_services.timer_func_runner import * 28from cengal.file_system.file_manager import path_relative_to_current_dir 29from cengal.time_management.cpu_clock_cycles import perf_counter 30from cengal.data_manipulation.serialization import * 31from typing import Hashable, Tuple, List, Any, Dict, Callable, Type 32from cengal.introspection.inspect import get_exception, entity_owning_module_importable_str, entity_owning_module_info_and_owning_path, entity_properties 33from cengal.io.core.memory_management import IOCoreMemoryManagement 34from cengal.parallel_execution.asyncio.efficient_streams import StreamManagerIOCoreMemoryManagement, TcpStreamManager, UdpStreamManager, StreamManagerAbstract 35from cengal.code_flow_control.smart_values import ValueExistence 36from cengal.io.named_connections.named_connections_manager import NamedConnectionsManager 37from cengal.code_flow_control.args_manager import number_of_provided_args 38from cengal.data_manipulation.serialization import Serializer, Serializers, best_serializer_for_standard_data 39from cengal.code_flow_control.args_manager import find_arg_position_and_value, UnknownArgumentError 40from cengal.data_generation.id_generator import IDGenerator, GeneratorType 41from cengal.system import PLATFORM_NAME, PYTHON_VERSION 42from importlib import import_module 43import sys 44import os 45import asyncio 46import lmdb 47 48from .exceptions import * 49from .commands import * 50from .class_info import * 51from .request_class_info import * 52from .remote_node import * 53from .serializers import * 54 55 56""" 57Module Docstring 58Docstrings: http://www.python.org/dev/peps/pep-0257/ 59""" 60 61__author__ = "ButenkoMS <gtalk@butenkoms.space>" 62__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 63__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 64__license__ = "Apache License, Version 2.0" 65__version__ = "4.4.1" 66__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 67__email__ = "gtalk@butenkoms.space" 68# __status__ = "Prototype" 69__status__ = "Development" 70# __status__ = "Production" 71 72 73class RemoteNodesRequest(ServiceRequest): 74 def start(self, host=None, port=None, tls: bool = True, stream_manager: Optional[StreamManagerAbstract] = None) -> ServiceRequest: 75 return self._save(0, host, port) 76 def stop(self) -> ServiceRequest: 77 return self._save(1) 78 def connect(self, host=None, port=None, connection_id_alias=None, tls: bool = True, stream_manager: Optional[StreamManagerAbstract] = None) -> ServiceRequest: 79 return self._save(2, host, port, connection_id_alias) 80 def disconnect(self, connection_id) -> ServiceRequest: 81 return self._save(3, connection_id) 82 83 84class State(Enum): 85 start_initiated = 0 86 started = 1 87 stop_initiated = 2 88 stopped = 3 89 90 91def is_current_platform(current_platform_name, foreign_platform_name): 92 return current_platform_name == foreign_platform_name 93 94 95class RemoteNodes(DualImmediateProcessingServiceMixin, Service, EntityStatsMixin): 96 def __init__(self, loop: CoroSchedulerType): 97 super(RemoteNodes, self).__init__(loop) 98 self._request_workers = { 99 0: self._on_start, 100 1: self._on_stop, 101 2: self._on_connect, 102 2: self._on_disconnect, 103 } 104 self.platform_name: str = f'{PLATFORM_NAME}-{".".join(PYTHON_VERSION)}' 105 self.state: State = State.stopped 106 self.servers: Dict[Hashable, RemoteServer] = dict() 107 self.clients: Dict[Hashable, RemoteClient] = dict() 108 109 self.serializer__current_platform__custom_types: Serializer = best_serializer_for_standard_data(( 110 DataFormats.any, 111 Tags.current_platform, 112 Tags.deep, 113 Tags.can_use_custom_types, 114 Tags.can_use_bytes, 115 Tags.can_use_set, 116 Tags.decode_str_as_str, 117 Tags.decode_bytes_as_bytes, 118 Tags.decode_tuple_as_tuple, 119 Tags.decode_list_as_list, 120 ), TestDataType.deep_large, 0.1) 121 print(self.serializer__current_platform__custom_types.serializer) 122 123 self.serializer__current_platform: Serializer = best_serializer_for_standard_data(( 124 DataFormats.any, 125 Tags.deep, 126 Tags.can_use_set, 127 Tags.can_use_bytes, 128 Tags.decode_str_as_str, 129 Tags.decode_bytes_as_bytes, 130 Tags.decode_tuple_as_tuple, 131 Tags.decode_list_as_list, 132 ), TestDataType.deep_large, 0.1) 133 print(self.serializer__current_platform.serializer) 134 135 self.serializer__multi_platform: Serializer = best_serializer_for_standard_data(( 136 DataFormats.any, 137 Tags.deep, 138 Tags.multi_platform, 139 Tags.can_use_bytes, 140 Tags.decode_str_as_str, 141 Tags.decode_list_as_list, 142 ), TestDataType.deep_large, 0.1) 143 print(self.serializer__multi_platform.serializer) 144 145 self.serializer__multi_platform_fast: Serializer = best_serializer_for_standard_data(( 146 DataFormats.any, 147 Tags.deep, 148 Tags.multi_platform, 149 Tags.can_use_bytes, 150 ), TestDataType.small, 0.1) 151 print(self.serializer__multi_platform_fast.serializer) 152 153 self.serializer__multi_platform__initial_communication: Serializer = best_serializer_for_standard_data(( 154 DataFormats.json, 155 Tags.deep, 156 Tags.multi_platform, 157 ), TestDataType.small, 0.1) 158 print(self.serializer__multi_platform_fast.serializer) 159 160 def serialize(self, foreign_platform_name, data) -> Tuple[Serializer, SerializerID, bytes]: 161 """_summary_ 162 163 Args: 164 foreign_platform_name (_type_): _description_ 165 data (_type_): _description_ 166 167 Raises: 168 MessageDataCanNotBeSerializedForRequestedNodeError: _description_ 169 170 Returns: 171 Tuple[Serializer, SerializerID, bytes]: _description_ 172 """ 173 current_platform = is_current_platform(self.platform_name, foreign_platform_name) 174 if current_platform: 175 serializer_id: SerializerID = SerializerID.current_platform 176 serializer: Serializer = self.serializer__current_platform 177 else: 178 serializer_id = SerializerID.multi_platform 179 serializer = self.serializer__multi_platform 180 181 try: 182 return serializer, serializer_id, serializer.dumps(data) 183 except: 184 if not current_platform: 185 raise MessageDataCanNotBeSerializedForRequestedNodeError 186 187 serializer_id = SerializerID.current_platform__custom_types 188 serializer = self.serializer__current_platform__custom_types 189 return serializer, serializer_id, serializer.dumps(data) 190 191 def deserialize(self, serializer_id: int, data: bytes) -> Any: 192 """_summary_ 193 194 Args: 195 serializer_id (int): _description_ 196 data (bytes): _description_ 197 198 Raises: 199 UnknownSerializerIDError: _description_ 200 201 Returns: 202 Any: _description_ 203 """ 204 if SerializerID.multi_platform_fast.value == serializer_id: 205 serializer: Serializer = self.serializer__multi_platform_fast 206 elif SerializerID.multi_platform.value == serializer_id: 207 serializer = self.serializer__multi_platform 208 elif SerializerID.current_platform.value == serializer_id: 209 serializer = self.serializer__current_platform 210 elif SerializerID.current_platform__custom_types.value == serializer_id: 211 serializer = self.serializer__current_platform__custom_types 212 else: 213 raise UnknownSerializerIDError(serializer_id) 214 215 return serializer.loads(data) 216 217 def serialize_request(self, server_id: Hashable, args, kwargs) -> bytes: 218 """_summary_ 219 220 Args: 221 server_id (Hashable): _description_ 222 args (_type_): _description_ 223 kwargs (_type_): _description_ 224 225 Raises: 226 MessageCanNotBeEmptyError: _description_ 227 RuntimeError: _description_ 228 RuntimeError: _description_ 229 230 Returns: 231 bytes: _description_ 232 """ 233 result: bytes = None 234 remote_server: RemoteServer = self.servers[server_id] 235 remotely_registered_service_classes: Dict[Type, LocalClassInfo] = remote_server.remotely_registered_service_classes 236 remotely_registered_request_classes: Dict[Type, LocalRequestClassInfo] = remote_server.remotely_registered_request_classes 237 try: 238 is_raw_request: bool = False 239 service_type_param_name: str = 'service_type' 240 request_param_name: str = 'request' 241 if 0 == number_of_provided_args(args, kwargs): 242 # an error 243 raise MessageCanNotBeEmptyError 244 elif 2 == number_of_provided_args(args, kwargs): 245 service_type: Type[Service] = None 246 request_obj: Request = None 247 params = {service_type_param_name: 0, request_param_name: 1} 248 service_type_param_found, service_type_param_pos, service_type_param_value = find_arg_position_and_value(service_type_param_name, params, args, kwargs) 249 if service_type_param_found: 250 if isinstance(service_type_param_value, type) and issubclass(service_type_param_value, Service): 251 service_type = service_type_param_value 252 else: 253 is_raw_request = True 254 else: 255 is_raw_request = True 256 257 if not is_raw_request: 258 request_param_found, request_param_pos, request_param_value = find_arg_position_and_value(request_param_name, params, args, kwargs) 259 if request_param_found: 260 if isinstance(request_param_value, Request): 261 request_obj = request_param_value 262 else: 263 is_raw_request = True 264 else: 265 is_raw_request = True 266 267 if not is_raw_request: 268 if (service_type is None) or (request_obj is None): 269 raise RuntimeError # executed only if there is some bug in code 270 271 # check service existance 272 new_service: bool = False 273 if service_type not in remotely_registered_service_classes: 274 new_service = True 275 remote_server.register_service_class(service_type) 276 277 local_service_class_info: LocalClassInfo = remotely_registered_service_classes[service_type] 278 messages: List[bytes] = list() 279 280 # register new service 281 if new_service: 282 sys_data: Dict = local_service_class_info() 283 request: Dict = { 284 Fields.command_name.value: Commands.declare_service_class.value, 285 Fields.request_id.value: remote_server.gen_request_id(), 286 Fields.data.value: sys_data, 287 } 288 serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request) 289 messages.append(serialized_request) 290 291 request_obj_type: Type[Request] = type(request_obj) 292 293 # check request type existance 294 new_request_type: bool = False 295 if request_obj_type not in remotely_registered_request_classes: 296 new_request_type = True 297 remote_server.register_request_class(request_obj) 298 299 local_request_class_info: LocalRequestClassInfo = remotely_registered_request_classes[request_obj_type] 300 301 # register new request type 302 if new_request_type: 303 sys_data: Dict = local_request_class_info() 304 request: Dict = { 305 Fields.command_name.value: Commands.declare_service_request_class.value, 306 Fields.request_id.value: remote_server.gen_request_id(), 307 Fields.data.value: sys_data, 308 } 309 serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request) 310 messages.append(serialized_request) 311 312 # - actual request to service 313 request_data: Dict = local_request_class_info.request_to_data(request_obj) 314 request_data[CommandDataFieldsServiceRequestWithRequestClass.service_class_id.value] = local_service_class_info.local_id 315 _, serializer_id, request_data = self.serialize(remote_server.foreign_platform_name, request_data) 316 request: Dict = { 317 Fields.command_name.value: Commands.send_service_request_with_request_class.value, 318 Fields.request_id.value: remote_server.gen_request_id(), 319 Fields.is_response_required.value: 1, 320 Fields.data_serializer_id.value: serializer_id.value, 321 Fields.data.value: request_data, 322 } 323 324 serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request) 325 messages.append(serialized_request) 326 result = b''.join(messages) 327 else: 328 params = {service_type_param_name: 0} 329 found, pos, value = find_arg_position_and_value(service_type_param_name, params, args, kwargs) 330 if found: 331 if pos is None: 332 del kwargs[service_type_param_name] 333 else: 334 args = args[1:] 335 336 if isinstance(value, type) and issubclass(value, Service): 337 service_type: Type[Service] = value 338 339 # check service existance 340 new_service: bool = False 341 if service_type not in remotely_registered_service_classes: 342 new_service = True 343 remote_server.register_service_class(service_type) 344 345 local_service_class_info: LocalClassInfo = remotely_registered_service_classes[service_type] 346 347 messages: List[bytes] = list() 348 349 # register new service 350 if new_service: 351 sys_data: Dict = local_service_class_info() 352 request: Dict = { 353 Fields.command_name.value: Commands.declare_service_class.value, 354 Fields.request_id.value: remote_server.gen_request_id(), 355 Fields.data.value: sys_data, 356 } 357 serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request) 358 messages.append(serialized_request) 359 360 # actual request to service 361 request_data: Dict = { 362 CommandDataFieldsServiceRequest.service_class_id.value: local_service_class_info.local_id, 363 } 364 explicit_serializer_required: bool = False 365 if args: 366 explicit_serializer_required = True 367 request_data[CommandDataFieldsServiceRequest.args.value] = args 368 369 if kwargs: 370 explicit_serializer_required = True 371 request_data[CommandDataFieldsServiceRequest.kwargs.value] = kwargs 372 373 if explicit_serializer_required: 374 _, serializer_id, request_data = self.serialize(remote_server.foreign_platform_name, request_data) 375 376 request: Dict = { 377 Fields.command_name.value: Commands.send_service_request.value, 378 Fields.request_id.value: remote_server.gen_request_id(), 379 Fields.is_response_required.value: 1, 380 Fields.data.value: request_data, 381 } 382 if explicit_serializer_required: 383 request[Fields.data_serializer_id.value] = serializer_id.value 384 385 serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request) 386 messages.append(serialized_request) 387 result = b''.join(messages) 388 else: 389 is_raw_request = True 390 else: 391 is_raw_request = True 392 393 if is_raw_request: 394 # raw request 395 request_data: Dict = { 396 CommandDataFieldsRequest.args.value: args, 397 CommandDataFieldsRequest.kwargs.value: kwargs, 398 } 399 _, serializer_id, request_data = self.serialize(remote_server.foreign_platform_name, request_data) 400 401 request: Dict = { 402 Fields.command_name.value: Commands.send_request.value, 403 Fields.request_id.value: remote_server.gen_request_id(), 404 Fields.is_response_required.value: 1, 405 Fields.data_serializer_id.value: serializer_id.value, 406 Fields.data.value: request_data, 407 } 408 409 result = self.serializer__multi_platform_fast.dumps(request) 410 except UnknownArgumentError: 411 raise RuntimeError # executed only if there is some bug in code 412 413 return result 414 415 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 416 return type(self).__name__, { 417 'instances num': len(self.instances), 418 } 419 420 def single_task_registration_or_immediate_processing_single( 421 self, *args, **kwargs 422 ) -> Tuple[bool, Optional[CoroID], Any]: 423 number_of_args: int = number_of_provided_args(args, kwargs) 424 if 1 == number_of_args: 425 return self._on_get(*args, **kwargs) 426 elif 2 == number_of_args: 427 return self._on_set(*args, **kwargs) 428 else: 429 return True, None, RuntimeError(f'Wrong number of parameters: {number_of_args}') 430 431 def full_processing_iteration(self): 432 results_bak = self.results 433 self.results = type(results_bak)() 434 for waiter_coro_id, instance in results_bak.items(): 435 self.register_response(waiter_coro_id, instance) 436 437 self.make_dead() 438 439 def in_work(self) -> bool: 440 return self.thrifty_in_work(bool(self.results)) 441 442 def _on_start(self, host=None, port=None): 443 if State.stopped != self.state: 444 return True, None, RuntimeError('Already started or not yet stopped.') 445 else: 446 ... 447 448 return True, None, None 449 450 451RemoteNodesRequest.default_service_type = RemoteNodes
96class RemoteNodes(DualImmediateProcessingServiceMixin, Service, EntityStatsMixin): 97 def __init__(self, loop: CoroSchedulerType): 98 super(RemoteNodes, self).__init__(loop) 99 self._request_workers = { 100 0: self._on_start, 101 1: self._on_stop, 102 2: self._on_connect, 103 2: self._on_disconnect, 104 } 105 self.platform_name: str = f'{PLATFORM_NAME}-{".".join(PYTHON_VERSION)}' 106 self.state: State = State.stopped 107 self.servers: Dict[Hashable, RemoteServer] = dict() 108 self.clients: Dict[Hashable, RemoteClient] = dict() 109 110 self.serializer__current_platform__custom_types: Serializer = best_serializer_for_standard_data(( 111 DataFormats.any, 112 Tags.current_platform, 113 Tags.deep, 114 Tags.can_use_custom_types, 115 Tags.can_use_bytes, 116 Tags.can_use_set, 117 Tags.decode_str_as_str, 118 Tags.decode_bytes_as_bytes, 119 Tags.decode_tuple_as_tuple, 120 Tags.decode_list_as_list, 121 ), TestDataType.deep_large, 0.1) 122 print(self.serializer__current_platform__custom_types.serializer) 123 124 self.serializer__current_platform: Serializer = best_serializer_for_standard_data(( 125 DataFormats.any, 126 Tags.deep, 127 Tags.can_use_set, 128 Tags.can_use_bytes, 129 Tags.decode_str_as_str, 130 Tags.decode_bytes_as_bytes, 131 Tags.decode_tuple_as_tuple, 132 Tags.decode_list_as_list, 133 ), TestDataType.deep_large, 0.1) 134 print(self.serializer__current_platform.serializer) 135 136 self.serializer__multi_platform: Serializer = best_serializer_for_standard_data(( 137 DataFormats.any, 138 Tags.deep, 139 Tags.multi_platform, 140 Tags.can_use_bytes, 141 Tags.decode_str_as_str, 142 Tags.decode_list_as_list, 143 ), TestDataType.deep_large, 0.1) 144 print(self.serializer__multi_platform.serializer) 145 146 self.serializer__multi_platform_fast: Serializer = best_serializer_for_standard_data(( 147 DataFormats.any, 148 Tags.deep, 149 Tags.multi_platform, 150 Tags.can_use_bytes, 151 ), TestDataType.small, 0.1) 152 print(self.serializer__multi_platform_fast.serializer) 153 154 self.serializer__multi_platform__initial_communication: Serializer = best_serializer_for_standard_data(( 155 DataFormats.json, 156 Tags.deep, 157 Tags.multi_platform, 158 ), TestDataType.small, 0.1) 159 print(self.serializer__multi_platform_fast.serializer) 160 161 def serialize(self, foreign_platform_name, data) -> Tuple[Serializer, SerializerID, bytes]: 162 """_summary_ 163 164 Args: 165 foreign_platform_name (_type_): _description_ 166 data (_type_): _description_ 167 168 Raises: 169 MessageDataCanNotBeSerializedForRequestedNodeError: _description_ 170 171 Returns: 172 Tuple[Serializer, SerializerID, bytes]: _description_ 173 """ 174 current_platform = is_current_platform(self.platform_name, foreign_platform_name) 175 if current_platform: 176 serializer_id: SerializerID = SerializerID.current_platform 177 serializer: Serializer = self.serializer__current_platform 178 else: 179 serializer_id = SerializerID.multi_platform 180 serializer = self.serializer__multi_platform 181 182 try: 183 return serializer, serializer_id, serializer.dumps(data) 184 except: 185 if not current_platform: 186 raise MessageDataCanNotBeSerializedForRequestedNodeError 187 188 serializer_id = SerializerID.current_platform__custom_types 189 serializer = self.serializer__current_platform__custom_types 190 return serializer, serializer_id, serializer.dumps(data) 191 192 def deserialize(self, serializer_id: int, data: bytes) -> Any: 193 """_summary_ 194 195 Args: 196 serializer_id (int): _description_ 197 data (bytes): _description_ 198 199 Raises: 200 UnknownSerializerIDError: _description_ 201 202 Returns: 203 Any: _description_ 204 """ 205 if SerializerID.multi_platform_fast.value == serializer_id: 206 serializer: Serializer = self.serializer__multi_platform_fast 207 elif SerializerID.multi_platform.value == serializer_id: 208 serializer = self.serializer__multi_platform 209 elif SerializerID.current_platform.value == serializer_id: 210 serializer = self.serializer__current_platform 211 elif SerializerID.current_platform__custom_types.value == serializer_id: 212 serializer = self.serializer__current_platform__custom_types 213 else: 214 raise UnknownSerializerIDError(serializer_id) 215 216 return serializer.loads(data) 217 218 def serialize_request(self, server_id: Hashable, args, kwargs) -> bytes: 219 """_summary_ 220 221 Args: 222 server_id (Hashable): _description_ 223 args (_type_): _description_ 224 kwargs (_type_): _description_ 225 226 Raises: 227 MessageCanNotBeEmptyError: _description_ 228 RuntimeError: _description_ 229 RuntimeError: _description_ 230 231 Returns: 232 bytes: _description_ 233 """ 234 result: bytes = None 235 remote_server: RemoteServer = self.servers[server_id] 236 remotely_registered_service_classes: Dict[Type, LocalClassInfo] = remote_server.remotely_registered_service_classes 237 remotely_registered_request_classes: Dict[Type, LocalRequestClassInfo] = remote_server.remotely_registered_request_classes 238 try: 239 is_raw_request: bool = False 240 service_type_param_name: str = 'service_type' 241 request_param_name: str = 'request' 242 if 0 == number_of_provided_args(args, kwargs): 243 # an error 244 raise MessageCanNotBeEmptyError 245 elif 2 == number_of_provided_args(args, kwargs): 246 service_type: Type[Service] = None 247 request_obj: Request = None 248 params = {service_type_param_name: 0, request_param_name: 1} 249 service_type_param_found, service_type_param_pos, service_type_param_value = find_arg_position_and_value(service_type_param_name, params, args, kwargs) 250 if service_type_param_found: 251 if isinstance(service_type_param_value, type) and issubclass(service_type_param_value, Service): 252 service_type = service_type_param_value 253 else: 254 is_raw_request = True 255 else: 256 is_raw_request = True 257 258 if not is_raw_request: 259 request_param_found, request_param_pos, request_param_value = find_arg_position_and_value(request_param_name, params, args, kwargs) 260 if request_param_found: 261 if isinstance(request_param_value, Request): 262 request_obj = request_param_value 263 else: 264 is_raw_request = True 265 else: 266 is_raw_request = True 267 268 if not is_raw_request: 269 if (service_type is None) or (request_obj is None): 270 raise RuntimeError # executed only if there is some bug in code 271 272 # check service existance 273 new_service: bool = False 274 if service_type not in remotely_registered_service_classes: 275 new_service = True 276 remote_server.register_service_class(service_type) 277 278 local_service_class_info: LocalClassInfo = remotely_registered_service_classes[service_type] 279 messages: List[bytes] = list() 280 281 # register new service 282 if new_service: 283 sys_data: Dict = local_service_class_info() 284 request: Dict = { 285 Fields.command_name.value: Commands.declare_service_class.value, 286 Fields.request_id.value: remote_server.gen_request_id(), 287 Fields.data.value: sys_data, 288 } 289 serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request) 290 messages.append(serialized_request) 291 292 request_obj_type: Type[Request] = type(request_obj) 293 294 # check request type existance 295 new_request_type: bool = False 296 if request_obj_type not in remotely_registered_request_classes: 297 new_request_type = True 298 remote_server.register_request_class(request_obj) 299 300 local_request_class_info: LocalRequestClassInfo = remotely_registered_request_classes[request_obj_type] 301 302 # register new request type 303 if new_request_type: 304 sys_data: Dict = local_request_class_info() 305 request: Dict = { 306 Fields.command_name.value: Commands.declare_service_request_class.value, 307 Fields.request_id.value: remote_server.gen_request_id(), 308 Fields.data.value: sys_data, 309 } 310 serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request) 311 messages.append(serialized_request) 312 313 # - actual request to service 314 request_data: Dict = local_request_class_info.request_to_data(request_obj) 315 request_data[CommandDataFieldsServiceRequestWithRequestClass.service_class_id.value] = local_service_class_info.local_id 316 _, serializer_id, request_data = self.serialize(remote_server.foreign_platform_name, request_data) 317 request: Dict = { 318 Fields.command_name.value: Commands.send_service_request_with_request_class.value, 319 Fields.request_id.value: remote_server.gen_request_id(), 320 Fields.is_response_required.value: 1, 321 Fields.data_serializer_id.value: serializer_id.value, 322 Fields.data.value: request_data, 323 } 324 325 serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request) 326 messages.append(serialized_request) 327 result = b''.join(messages) 328 else: 329 params = {service_type_param_name: 0} 330 found, pos, value = find_arg_position_and_value(service_type_param_name, params, args, kwargs) 331 if found: 332 if pos is None: 333 del kwargs[service_type_param_name] 334 else: 335 args = args[1:] 336 337 if isinstance(value, type) and issubclass(value, Service): 338 service_type: Type[Service] = value 339 340 # check service existance 341 new_service: bool = False 342 if service_type not in remotely_registered_service_classes: 343 new_service = True 344 remote_server.register_service_class(service_type) 345 346 local_service_class_info: LocalClassInfo = remotely_registered_service_classes[service_type] 347 348 messages: List[bytes] = list() 349 350 # register new service 351 if new_service: 352 sys_data: Dict = local_service_class_info() 353 request: Dict = { 354 Fields.command_name.value: Commands.declare_service_class.value, 355 Fields.request_id.value: remote_server.gen_request_id(), 356 Fields.data.value: sys_data, 357 } 358 serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request) 359 messages.append(serialized_request) 360 361 # actual request to service 362 request_data: Dict = { 363 CommandDataFieldsServiceRequest.service_class_id.value: local_service_class_info.local_id, 364 } 365 explicit_serializer_required: bool = False 366 if args: 367 explicit_serializer_required = True 368 request_data[CommandDataFieldsServiceRequest.args.value] = args 369 370 if kwargs: 371 explicit_serializer_required = True 372 request_data[CommandDataFieldsServiceRequest.kwargs.value] = kwargs 373 374 if explicit_serializer_required: 375 _, serializer_id, request_data = self.serialize(remote_server.foreign_platform_name, request_data) 376 377 request: Dict = { 378 Fields.command_name.value: Commands.send_service_request.value, 379 Fields.request_id.value: remote_server.gen_request_id(), 380 Fields.is_response_required.value: 1, 381 Fields.data.value: request_data, 382 } 383 if explicit_serializer_required: 384 request[Fields.data_serializer_id.value] = serializer_id.value 385 386 serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request) 387 messages.append(serialized_request) 388 result = b''.join(messages) 389 else: 390 is_raw_request = True 391 else: 392 is_raw_request = True 393 394 if is_raw_request: 395 # raw request 396 request_data: Dict = { 397 CommandDataFieldsRequest.args.value: args, 398 CommandDataFieldsRequest.kwargs.value: kwargs, 399 } 400 _, serializer_id, request_data = self.serialize(remote_server.foreign_platform_name, request_data) 401 402 request: Dict = { 403 Fields.command_name.value: Commands.send_request.value, 404 Fields.request_id.value: remote_server.gen_request_id(), 405 Fields.is_response_required.value: 1, 406 Fields.data_serializer_id.value: serializer_id.value, 407 Fields.data.value: request_data, 408 } 409 410 result = self.serializer__multi_platform_fast.dumps(request) 411 except UnknownArgumentError: 412 raise RuntimeError # executed only if there is some bug in code 413 414 return result 415 416 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 417 return type(self).__name__, { 418 'instances num': len(self.instances), 419 } 420 421 def single_task_registration_or_immediate_processing_single( 422 self, *args, **kwargs 423 ) -> Tuple[bool, Optional[CoroID], Any]: 424 number_of_args: int = number_of_provided_args(args, kwargs) 425 if 1 == number_of_args: 426 return self._on_get(*args, **kwargs) 427 elif 2 == number_of_args: 428 return self._on_set(*args, **kwargs) 429 else: 430 return True, None, RuntimeError(f'Wrong number of parameters: {number_of_args}') 431 432 def full_processing_iteration(self): 433 results_bak = self.results 434 self.results = type(results_bak)() 435 for waiter_coro_id, instance in results_bak.items(): 436 self.register_response(waiter_coro_id, instance) 437 438 self.make_dead() 439 440 def in_work(self) -> bool: 441 return self.thrifty_in_work(bool(self.results)) 442 443 def _on_start(self, host=None, port=None): 444 if State.stopped != self.state: 445 return True, None, RuntimeError('Already started or not yet stopped.') 446 else: 447 ... 448 449 return True, None, None
97 def __init__(self, loop: CoroSchedulerType): 98 super(RemoteNodes, self).__init__(loop) 99 self._request_workers = { 100 0: self._on_start, 101 1: self._on_stop, 102 2: self._on_connect, 103 2: self._on_disconnect, 104 } 105 self.platform_name: str = f'{PLATFORM_NAME}-{".".join(PYTHON_VERSION)}' 106 self.state: State = State.stopped 107 self.servers: Dict[Hashable, RemoteServer] = dict() 108 self.clients: Dict[Hashable, RemoteClient] = dict() 109 110 self.serializer__current_platform__custom_types: Serializer = best_serializer_for_standard_data(( 111 DataFormats.any, 112 Tags.current_platform, 113 Tags.deep, 114 Tags.can_use_custom_types, 115 Tags.can_use_bytes, 116 Tags.can_use_set, 117 Tags.decode_str_as_str, 118 Tags.decode_bytes_as_bytes, 119 Tags.decode_tuple_as_tuple, 120 Tags.decode_list_as_list, 121 ), TestDataType.deep_large, 0.1) 122 print(self.serializer__current_platform__custom_types.serializer) 123 124 self.serializer__current_platform: Serializer = best_serializer_for_standard_data(( 125 DataFormats.any, 126 Tags.deep, 127 Tags.can_use_set, 128 Tags.can_use_bytes, 129 Tags.decode_str_as_str, 130 Tags.decode_bytes_as_bytes, 131 Tags.decode_tuple_as_tuple, 132 Tags.decode_list_as_list, 133 ), TestDataType.deep_large, 0.1) 134 print(self.serializer__current_platform.serializer) 135 136 self.serializer__multi_platform: Serializer = best_serializer_for_standard_data(( 137 DataFormats.any, 138 Tags.deep, 139 Tags.multi_platform, 140 Tags.can_use_bytes, 141 Tags.decode_str_as_str, 142 Tags.decode_list_as_list, 143 ), TestDataType.deep_large, 0.1) 144 print(self.serializer__multi_platform.serializer) 145 146 self.serializer__multi_platform_fast: Serializer = best_serializer_for_standard_data(( 147 DataFormats.any, 148 Tags.deep, 149 Tags.multi_platform, 150 Tags.can_use_bytes, 151 ), TestDataType.small, 0.1) 152 print(self.serializer__multi_platform_fast.serializer) 153 154 self.serializer__multi_platform__initial_communication: Serializer = best_serializer_for_standard_data(( 155 DataFormats.json, 156 Tags.deep, 157 Tags.multi_platform, 158 ), TestDataType.small, 0.1) 159 print(self.serializer__multi_platform_fast.serializer)
161 def serialize(self, foreign_platform_name, data) -> Tuple[Serializer, SerializerID, bytes]: 162 """_summary_ 163 164 Args: 165 foreign_platform_name (_type_): _description_ 166 data (_type_): _description_ 167 168 Raises: 169 MessageDataCanNotBeSerializedForRequestedNodeError: _description_ 170 171 Returns: 172 Tuple[Serializer, SerializerID, bytes]: _description_ 173 """ 174 current_platform = is_current_platform(self.platform_name, foreign_platform_name) 175 if current_platform: 176 serializer_id: SerializerID = SerializerID.current_platform 177 serializer: Serializer = self.serializer__current_platform 178 else: 179 serializer_id = SerializerID.multi_platform 180 serializer = self.serializer__multi_platform 181 182 try: 183 return serializer, serializer_id, serializer.dumps(data) 184 except: 185 if not current_platform: 186 raise MessageDataCanNotBeSerializedForRequestedNodeError 187 188 serializer_id = SerializerID.current_platform__custom_types 189 serializer = self.serializer__current_platform__custom_types 190 return serializer, serializer_id, serializer.dumps(data)
_summary_
Args: foreign_platform_name (_type_): _description_ data (_type_): _description_
Raises: MessageDataCanNotBeSerializedForRequestedNodeError: _description_
Returns: Tuple[Serializer, SerializerID, bytes]: _description_
192 def deserialize(self, serializer_id: int, data: bytes) -> Any: 193 """_summary_ 194 195 Args: 196 serializer_id (int): _description_ 197 data (bytes): _description_ 198 199 Raises: 200 UnknownSerializerIDError: _description_ 201 202 Returns: 203 Any: _description_ 204 """ 205 if SerializerID.multi_platform_fast.value == serializer_id: 206 serializer: Serializer = self.serializer__multi_platform_fast 207 elif SerializerID.multi_platform.value == serializer_id: 208 serializer = self.serializer__multi_platform 209 elif SerializerID.current_platform.value == serializer_id: 210 serializer = self.serializer__current_platform 211 elif SerializerID.current_platform__custom_types.value == serializer_id: 212 serializer = self.serializer__current_platform__custom_types 213 else: 214 raise UnknownSerializerIDError(serializer_id) 215 216 return serializer.loads(data)
_summary_
Args: serializer_id (int): _description_ data (bytes): _description_
Raises: UnknownSerializerIDError: _description_
Returns: Any: _description_
218 def serialize_request(self, server_id: Hashable, args, kwargs) -> bytes: 219 """_summary_ 220 221 Args: 222 server_id (Hashable): _description_ 223 args (_type_): _description_ 224 kwargs (_type_): _description_ 225 226 Raises: 227 MessageCanNotBeEmptyError: _description_ 228 RuntimeError: _description_ 229 RuntimeError: _description_ 230 231 Returns: 232 bytes: _description_ 233 """ 234 result: bytes = None 235 remote_server: RemoteServer = self.servers[server_id] 236 remotely_registered_service_classes: Dict[Type, LocalClassInfo] = remote_server.remotely_registered_service_classes 237 remotely_registered_request_classes: Dict[Type, LocalRequestClassInfo] = remote_server.remotely_registered_request_classes 238 try: 239 is_raw_request: bool = False 240 service_type_param_name: str = 'service_type' 241 request_param_name: str = 'request' 242 if 0 == number_of_provided_args(args, kwargs): 243 # an error 244 raise MessageCanNotBeEmptyError 245 elif 2 == number_of_provided_args(args, kwargs): 246 service_type: Type[Service] = None 247 request_obj: Request = None 248 params = {service_type_param_name: 0, request_param_name: 1} 249 service_type_param_found, service_type_param_pos, service_type_param_value = find_arg_position_and_value(service_type_param_name, params, args, kwargs) 250 if service_type_param_found: 251 if isinstance(service_type_param_value, type) and issubclass(service_type_param_value, Service): 252 service_type = service_type_param_value 253 else: 254 is_raw_request = True 255 else: 256 is_raw_request = True 257 258 if not is_raw_request: 259 request_param_found, request_param_pos, request_param_value = find_arg_position_and_value(request_param_name, params, args, kwargs) 260 if request_param_found: 261 if isinstance(request_param_value, Request): 262 request_obj = request_param_value 263 else: 264 is_raw_request = True 265 else: 266 is_raw_request = True 267 268 if not is_raw_request: 269 if (service_type is None) or (request_obj is None): 270 raise RuntimeError # executed only if there is some bug in code 271 272 # check service existance 273 new_service: bool = False 274 if service_type not in remotely_registered_service_classes: 275 new_service = True 276 remote_server.register_service_class(service_type) 277 278 local_service_class_info: LocalClassInfo = remotely_registered_service_classes[service_type] 279 messages: List[bytes] = list() 280 281 # register new service 282 if new_service: 283 sys_data: Dict = local_service_class_info() 284 request: Dict = { 285 Fields.command_name.value: Commands.declare_service_class.value, 286 Fields.request_id.value: remote_server.gen_request_id(), 287 Fields.data.value: sys_data, 288 } 289 serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request) 290 messages.append(serialized_request) 291 292 request_obj_type: Type[Request] = type(request_obj) 293 294 # check request type existance 295 new_request_type: bool = False 296 if request_obj_type not in remotely_registered_request_classes: 297 new_request_type = True 298 remote_server.register_request_class(request_obj) 299 300 local_request_class_info: LocalRequestClassInfo = remotely_registered_request_classes[request_obj_type] 301 302 # register new request type 303 if new_request_type: 304 sys_data: Dict = local_request_class_info() 305 request: Dict = { 306 Fields.command_name.value: Commands.declare_service_request_class.value, 307 Fields.request_id.value: remote_server.gen_request_id(), 308 Fields.data.value: sys_data, 309 } 310 serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request) 311 messages.append(serialized_request) 312 313 # - actual request to service 314 request_data: Dict = local_request_class_info.request_to_data(request_obj) 315 request_data[CommandDataFieldsServiceRequestWithRequestClass.service_class_id.value] = local_service_class_info.local_id 316 _, serializer_id, request_data = self.serialize(remote_server.foreign_platform_name, request_data) 317 request: Dict = { 318 Fields.command_name.value: Commands.send_service_request_with_request_class.value, 319 Fields.request_id.value: remote_server.gen_request_id(), 320 Fields.is_response_required.value: 1, 321 Fields.data_serializer_id.value: serializer_id.value, 322 Fields.data.value: request_data, 323 } 324 325 serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request) 326 messages.append(serialized_request) 327 result = b''.join(messages) 328 else: 329 params = {service_type_param_name: 0} 330 found, pos, value = find_arg_position_and_value(service_type_param_name, params, args, kwargs) 331 if found: 332 if pos is None: 333 del kwargs[service_type_param_name] 334 else: 335 args = args[1:] 336 337 if isinstance(value, type) and issubclass(value, Service): 338 service_type: Type[Service] = value 339 340 # check service existance 341 new_service: bool = False 342 if service_type not in remotely_registered_service_classes: 343 new_service = True 344 remote_server.register_service_class(service_type) 345 346 local_service_class_info: LocalClassInfo = remotely_registered_service_classes[service_type] 347 348 messages: List[bytes] = list() 349 350 # register new service 351 if new_service: 352 sys_data: Dict = local_service_class_info() 353 request: Dict = { 354 Fields.command_name.value: Commands.declare_service_class.value, 355 Fields.request_id.value: remote_server.gen_request_id(), 356 Fields.data.value: sys_data, 357 } 358 serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request) 359 messages.append(serialized_request) 360 361 # actual request to service 362 request_data: Dict = { 363 CommandDataFieldsServiceRequest.service_class_id.value: local_service_class_info.local_id, 364 } 365 explicit_serializer_required: bool = False 366 if args: 367 explicit_serializer_required = True 368 request_data[CommandDataFieldsServiceRequest.args.value] = args 369 370 if kwargs: 371 explicit_serializer_required = True 372 request_data[CommandDataFieldsServiceRequest.kwargs.value] = kwargs 373 374 if explicit_serializer_required: 375 _, serializer_id, request_data = self.serialize(remote_server.foreign_platform_name, request_data) 376 377 request: Dict = { 378 Fields.command_name.value: Commands.send_service_request.value, 379 Fields.request_id.value: remote_server.gen_request_id(), 380 Fields.is_response_required.value: 1, 381 Fields.data.value: request_data, 382 } 383 if explicit_serializer_required: 384 request[Fields.data_serializer_id.value] = serializer_id.value 385 386 serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request) 387 messages.append(serialized_request) 388 result = b''.join(messages) 389 else: 390 is_raw_request = True 391 else: 392 is_raw_request = True 393 394 if is_raw_request: 395 # raw request 396 request_data: Dict = { 397 CommandDataFieldsRequest.args.value: args, 398 CommandDataFieldsRequest.kwargs.value: kwargs, 399 } 400 _, serializer_id, request_data = self.serialize(remote_server.foreign_platform_name, request_data) 401 402 request: Dict = { 403 Fields.command_name.value: Commands.send_request.value, 404 Fields.request_id.value: remote_server.gen_request_id(), 405 Fields.is_response_required.value: 1, 406 Fields.data_serializer_id.value: serializer_id.value, 407 Fields.data.value: request_data, 408 } 409 410 result = self.serializer__multi_platform_fast.dumps(request) 411 except UnknownArgumentError: 412 raise RuntimeError # executed only if there is some bug in code 413 414 return result
_summary_
Args: server_id (Hashable): _description_ args (_type_): _description_ kwargs (_type_): _description_
Raises: MessageCanNotBeEmptyError: _description_ RuntimeError: _description_ RuntimeError: _description_
Returns: bytes: _description_
421 def single_task_registration_or_immediate_processing_single( 422 self, *args, **kwargs 423 ) -> Tuple[bool, Optional[CoroID], Any]: 424 number_of_args: int = number_of_provided_args(args, kwargs) 425 if 1 == number_of_args: 426 return self._on_get(*args, **kwargs) 427 elif 2 == number_of_args: 428 return self._on_set(*args, **kwargs) 429 else: 430 return True, None, RuntimeError(f'Wrong number of parameters: {number_of_args}')
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.DualImmediateProcessingServiceMixin
- single_task_registration_or_immediate_processing
- single_task_registration_or_immediate_processing_multiple
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- time_left_before_next_event
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
- destroy
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
- StatsLevel
74class RemoteNodesRequest(ServiceRequest): 75 def start(self, host=None, port=None, tls: bool = True, stream_manager: Optional[StreamManagerAbstract] = None) -> ServiceRequest: 76 return self._save(0, host, port) 77 def stop(self) -> ServiceRequest: 78 return self._save(1) 79 def connect(self, host=None, port=None, connection_id_alias=None, tls: bool = True, stream_manager: Optional[StreamManagerAbstract] = None) -> ServiceRequest: 80 return self._save(2, host, port, connection_id_alias) 81 def disconnect(self, connection_id) -> ServiceRequest: 82 return self._save(3, connection_id)
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
- default__request__type__
- request_type
- args
- kwargs
- provide_to_request_handler
- interface
- i
- async_interface
- ai