cengal.parallel_execution.coroutines.coro_standard_services.remote_nodes.versions.v_0.remote_nodes

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19__all__ = ['RemoteNodes', 'RemoteNodesRequest']
 20
 21from enum import Enum
 22from cengal.parallel_execution.coroutines.coro_scheduler import *
 23from cengal.parallel_execution.coroutines.coro_tools.await_coro import *
 24from cengal.parallel_execution.coroutines.coro_standard_services.asyncio_loop import *
 25from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import CoroPriority
 26from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import *
 27from cengal.parallel_execution.coroutines.coro_standard_services.timer_func_runner import *
 28from cengal.file_system.file_manager import path_relative_to_current_dir
 29from cengal.time_management.cpu_clock_cycles import perf_counter
 30from cengal.data_manipulation.serialization import *
 31from typing import Hashable, Tuple, List, Any, Dict, Callable, Type
 32from cengal.introspection.inspect import get_exception, entity_owning_module_importable_str, entity_owning_module_info_and_owning_path, entity_properties
 33from cengal.io.core.memory_management import IOCoreMemoryManagement
 34from cengal.parallel_execution.asyncio.efficient_streams import StreamManagerIOCoreMemoryManagement, TcpStreamManager, UdpStreamManager, StreamManagerAbstract
 35from cengal.code_flow_control.smart_values import ValueExistence
 36from cengal.io.named_connections.named_connections_manager import NamedConnectionsManager
 37from cengal.code_flow_control.args_manager import number_of_provided_args
 38from cengal.data_manipulation.serialization import Serializer, Serializers, best_serializer_for_standard_data
 39from cengal.code_flow_control.args_manager import find_arg_position_and_value, UnknownArgumentError
 40from cengal.data_generation.id_generator import IDGenerator, GeneratorType
 41from cengal.system import PLATFORM_NAME, PYTHON_VERSION
 42from importlib import import_module
 43import sys
 44import os
 45import asyncio
 46import lmdb
 47
 48from .exceptions import *
 49from .commands import *
 50from .class_info import *
 51from .request_class_info import *
 52from .remote_node import *
 53from .serializers import *
 54
 55
 56"""
 57Module Docstring
 58Docstrings: http://www.python.org/dev/peps/pep-0257/
 59"""
 60
 61__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 62__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 63__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 64__license__ = "Apache License, Version 2.0"
 65__version__ = "4.4.1"
 66__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 67__email__ = "gtalk@butenkoms.space"
 68# __status__ = "Prototype"
 69__status__ = "Development"
 70# __status__ = "Production"
 71
 72
 73class RemoteNodesRequest(ServiceRequest):
 74    def start(self, host=None, port=None, tls: bool = True, stream_manager: Optional[StreamManagerAbstract] = None) -> ServiceRequest:
 75        return self._save(0, host, port)
 76    def stop(self) -> ServiceRequest:
 77        return self._save(1)
 78    def connect(self, host=None, port=None, connection_id_alias=None, tls: bool = True, stream_manager: Optional[StreamManagerAbstract] = None) -> ServiceRequest:
 79        return self._save(2, host, port, connection_id_alias)
 80    def disconnect(self, connection_id) -> ServiceRequest:
 81        return self._save(3, connection_id)
 82
 83
 84class State(Enum):
 85    start_initiated = 0
 86    started = 1
 87    stop_initiated = 2
 88    stopped = 3
 89
 90
 91def is_current_platform(current_platform_name, foreign_platform_name):
 92    return current_platform_name == foreign_platform_name
 93
 94
 95class RemoteNodes(DualImmediateProcessingServiceMixin, Service, EntityStatsMixin):
 96    def __init__(self, loop: CoroSchedulerType):
 97        super(RemoteNodes, self).__init__(loop)
 98        self._request_workers = {
 99            0: self._on_start,
100            1: self._on_stop,
101            2: self._on_connect,
102            2: self._on_disconnect,
103        }
104        self.platform_name: str = f'{PLATFORM_NAME}-{".".join(PYTHON_VERSION)}'
105        self.state: State = State.stopped
106        self.servers: Dict[Hashable, RemoteServer] = dict()
107        self.clients: Dict[Hashable, RemoteClient] = dict()
108
109        self.serializer__current_platform__custom_types: Serializer = best_serializer_for_standard_data((
110            DataFormats.any,
111            Tags.current_platform,
112            Tags.deep,
113            Tags.can_use_custom_types,
114            Tags.can_use_bytes,
115            Tags.can_use_set,
116            Tags.decode_str_as_str,
117            Tags.decode_bytes_as_bytes,
118            Tags.decode_tuple_as_tuple,
119            Tags.decode_list_as_list,
120        ), TestDataType.deep_large, 0.1)
121        print(self.serializer__current_platform__custom_types.serializer)
122
123        self.serializer__current_platform: Serializer = best_serializer_for_standard_data((
124            DataFormats.any,
125            Tags.deep,
126            Tags.can_use_set,
127            Tags.can_use_bytes,
128            Tags.decode_str_as_str,
129            Tags.decode_bytes_as_bytes,
130            Tags.decode_tuple_as_tuple,
131            Tags.decode_list_as_list,
132        ), TestDataType.deep_large, 0.1)
133        print(self.serializer__current_platform.serializer)
134
135        self.serializer__multi_platform: Serializer = best_serializer_for_standard_data((
136            DataFormats.any,
137            Tags.deep,
138            Tags.multi_platform,
139            Tags.can_use_bytes,
140            Tags.decode_str_as_str,
141            Tags.decode_list_as_list,
142        ), TestDataType.deep_large, 0.1)
143        print(self.serializer__multi_platform.serializer)
144
145        self.serializer__multi_platform_fast: Serializer = best_serializer_for_standard_data((
146            DataFormats.any,
147            Tags.deep,
148            Tags.multi_platform,
149            Tags.can_use_bytes,
150        ), TestDataType.small, 0.1)
151        print(self.serializer__multi_platform_fast.serializer)
152
153        self.serializer__multi_platform__initial_communication: Serializer = best_serializer_for_standard_data((
154            DataFormats.json,
155            Tags.deep,
156            Tags.multi_platform,
157        ), TestDataType.small, 0.1)
158        print(self.serializer__multi_platform_fast.serializer)
159    
160    def serialize(self, foreign_platform_name, data) -> Tuple[Serializer, SerializerID, bytes]:
161        """_summary_
162
163        Args:
164            foreign_platform_name (_type_): _description_
165            data (_type_): _description_
166
167        Raises:
168            MessageDataCanNotBeSerializedForRequestedNodeError: _description_
169
170        Returns:
171            Tuple[Serializer, SerializerID, bytes]: _description_
172        """        
173        current_platform = is_current_platform(self.platform_name, foreign_platform_name)
174        if current_platform:
175            serializer_id: SerializerID = SerializerID.current_platform
176            serializer: Serializer = self.serializer__current_platform
177        else:
178            serializer_id = SerializerID.multi_platform
179            serializer = self.serializer__multi_platform
180        
181        try:
182            return serializer, serializer_id, serializer.dumps(data)
183        except:
184            if not current_platform:
185                raise MessageDataCanNotBeSerializedForRequestedNodeError
186        
187        serializer_id = SerializerID.current_platform__custom_types
188        serializer = self.serializer__current_platform__custom_types
189        return serializer, serializer_id, serializer.dumps(data)
190    
191    def deserialize(self, serializer_id: int, data: bytes) -> Any:
192        """_summary_
193
194        Args:
195            serializer_id (int): _description_
196            data (bytes): _description_
197
198        Raises:
199            UnknownSerializerIDError: _description_
200
201        Returns:
202            Any: _description_
203        """        
204        if SerializerID.multi_platform_fast.value == serializer_id:
205            serializer: Serializer = self.serializer__multi_platform_fast
206        elif SerializerID.multi_platform.value == serializer_id:
207            serializer = self.serializer__multi_platform
208        elif SerializerID.current_platform.value == serializer_id:
209            serializer = self.serializer__current_platform
210        elif SerializerID.current_platform__custom_types.value == serializer_id:
211            serializer = self.serializer__current_platform__custom_types
212        else:
213            raise UnknownSerializerIDError(serializer_id)
214
215        return serializer.loads(data)
216    
217    def serialize_request(self, server_id: Hashable, args, kwargs) -> bytes:
218        """_summary_
219
220        Args:
221            server_id (Hashable): _description_
222            args (_type_): _description_
223            kwargs (_type_): _description_
224
225        Raises:
226            MessageCanNotBeEmptyError: _description_
227            RuntimeError: _description_
228            RuntimeError: _description_
229
230        Returns:
231            bytes: _description_
232        """        
233        result: bytes = None
234        remote_server: RemoteServer = self.servers[server_id]
235        remotely_registered_service_classes: Dict[Type, LocalClassInfo] = remote_server.remotely_registered_service_classes
236        remotely_registered_request_classes: Dict[Type, LocalRequestClassInfo] = remote_server.remotely_registered_request_classes
237        try:
238            is_raw_request: bool = False
239            service_type_param_name: str = 'service_type'
240            request_param_name: str  = 'request'
241            if 0 == number_of_provided_args(args, kwargs):
242                # an error
243                raise MessageCanNotBeEmptyError
244            elif 2 == number_of_provided_args(args, kwargs):
245                service_type: Type[Service] = None
246                request_obj: Request = None
247                params = {service_type_param_name: 0, request_param_name: 1}
248                service_type_param_found, service_type_param_pos, service_type_param_value = find_arg_position_and_value(service_type_param_name, params, args, kwargs)
249                if service_type_param_found:
250                    if isinstance(service_type_param_value, type) and issubclass(service_type_param_value, Service):
251                        service_type = service_type_param_value
252                    else:
253                        is_raw_request = True
254                else:
255                    is_raw_request = True
256                
257                if not is_raw_request:
258                    request_param_found, request_param_pos, request_param_value = find_arg_position_and_value(request_param_name, params, args, kwargs)
259                    if request_param_found:
260                        if isinstance(request_param_value, Request):
261                            request_obj = request_param_value
262                        else:
263                            is_raw_request = True
264                    else:
265                        is_raw_request = True
266                
267                if not is_raw_request:
268                    if (service_type is None) or (request_obj is None):
269                        raise RuntimeError  # executed only if there is some bug in code
270                    
271                    # check service existance
272                    new_service: bool = False
273                    if service_type not in remotely_registered_service_classes:
274                        new_service = True
275                        remote_server.register_service_class(service_type)
276
277                    local_service_class_info: LocalClassInfo = remotely_registered_service_classes[service_type]
278                    messages: List[bytes] = list()
279                    
280                    # register new service
281                    if new_service:
282                        sys_data: Dict = local_service_class_info()
283                        request: Dict = {
284                            Fields.command_name.value: Commands.declare_service_class.value,
285                            Fields.request_id.value: remote_server.gen_request_id(),
286                            Fields.data.value: sys_data,
287                        }
288                        serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request)
289                        messages.append(serialized_request)
290                    
291                    request_obj_type: Type[Request] = type(request_obj)
292
293                    # check request type existance
294                    new_request_type: bool = False
295                    if request_obj_type not in remotely_registered_request_classes:
296                        new_request_type = True
297                        remote_server.register_request_class(request_obj)
298
299                    local_request_class_info: LocalRequestClassInfo = remotely_registered_request_classes[request_obj_type]
300                    
301                    # register new request type
302                    if new_request_type:
303                        sys_data: Dict = local_request_class_info()
304                        request: Dict = {
305                            Fields.command_name.value: Commands.declare_service_request_class.value,
306                            Fields.request_id.value: remote_server.gen_request_id(),
307                            Fields.data.value: sys_data,
308                        }
309                        serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request)
310                        messages.append(serialized_request)
311
312                    # - actual request to service
313                    request_data: Dict = local_request_class_info.request_to_data(request_obj)
314                    request_data[CommandDataFieldsServiceRequestWithRequestClass.service_class_id.value] = local_service_class_info.local_id
315                    _, serializer_id, request_data = self.serialize(remote_server.foreign_platform_name, request_data)
316                    request: Dict = {
317                        Fields.command_name.value: Commands.send_service_request_with_request_class.value,
318                        Fields.request_id.value: remote_server.gen_request_id(),
319                        Fields.is_response_required.value: 1,
320                        Fields.data_serializer_id.value: serializer_id.value,
321                        Fields.data.value: request_data,
322                    }
323                    
324                    serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request)
325                    messages.append(serialized_request)
326                    result = b''.join(messages)
327            else:
328                params = {service_type_param_name: 0}
329                found, pos, value = find_arg_position_and_value(service_type_param_name, params, args, kwargs)
330                if found:
331                    if pos is None:
332                        del kwargs[service_type_param_name]
333                    else:
334                        args = args[1:]
335                    
336                    if isinstance(value, type) and issubclass(value, Service):
337                        service_type: Type[Service] = value
338
339                        # check service existance
340                        new_service: bool = False
341                        if service_type not in remotely_registered_service_classes:
342                            new_service = True
343                            remote_server.register_service_class(service_type)
344
345                        local_service_class_info: LocalClassInfo = remotely_registered_service_classes[service_type]
346                        
347                        messages: List[bytes] = list()
348                        
349                        # register new service
350                        if new_service:
351                            sys_data: Dict = local_service_class_info()
352                            request: Dict = {
353                                Fields.command_name.value: Commands.declare_service_class.value,
354                                Fields.request_id.value: remote_server.gen_request_id(),
355                                Fields.data.value: sys_data,
356                            }
357                            serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request)
358                            messages.append(serialized_request)
359
360                        # actual request to service
361                        request_data: Dict = {
362                            CommandDataFieldsServiceRequest.service_class_id.value: local_service_class_info.local_id,
363                        }
364                        explicit_serializer_required: bool = False
365                        if args:
366                            explicit_serializer_required = True
367                            request_data[CommandDataFieldsServiceRequest.args.value] = args
368                        
369                        if kwargs:
370                            explicit_serializer_required = True
371                            request_data[CommandDataFieldsServiceRequest.kwargs.value] = kwargs
372                        
373                        if explicit_serializer_required:
374                            _, serializer_id, request_data = self.serialize(remote_server.foreign_platform_name, request_data)
375                        
376                        request: Dict = {
377                            Fields.command_name.value: Commands.send_service_request.value,
378                            Fields.request_id.value: remote_server.gen_request_id(),
379                            Fields.is_response_required.value: 1,
380                            Fields.data.value: request_data,
381                        }
382                        if explicit_serializer_required:
383                            request[Fields.data_serializer_id.value] = serializer_id.value
384                        
385                        serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request)
386                        messages.append(serialized_request)
387                        result = b''.join(messages)
388                    else:
389                        is_raw_request = True
390                else:
391                    is_raw_request = True
392            
393            if is_raw_request:
394                # raw request
395                request_data: Dict = {
396                    CommandDataFieldsRequest.args.value: args,
397                    CommandDataFieldsRequest.kwargs.value: kwargs,
398                }
399                _, serializer_id, request_data = self.serialize(remote_server.foreign_platform_name, request_data)
400                
401                request: Dict = {
402                    Fields.command_name.value: Commands.send_request.value,
403                    Fields.request_id.value: remote_server.gen_request_id(),
404                    Fields.is_response_required.value: 1,
405                    Fields.data_serializer_id.value: serializer_id.value,
406                    Fields.data.value: request_data,
407                }
408                
409                result = self.serializer__multi_platform_fast.dumps(request)
410        except UnknownArgumentError:
411            raise RuntimeError  # executed only if there is some bug in code
412        
413        return result
414
415    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
416        return type(self).__name__, {
417            'instances num': len(self.instances),
418        }
419
420    def single_task_registration_or_immediate_processing_single(
421            self, *args, **kwargs
422    ) -> Tuple[bool, Optional[CoroID], Any]:
423        number_of_args: int = number_of_provided_args(args, kwargs)
424        if 1 == number_of_args:
425            return self._on_get(*args, **kwargs)
426        elif 2 == number_of_args:
427            return self._on_set(*args, **kwargs)
428        else:
429            return True, None, RuntimeError(f'Wrong number of parameters: {number_of_args}')
430
431    def full_processing_iteration(self):
432        results_bak = self.results
433        self.results = type(results_bak)()
434        for waiter_coro_id, instance in results_bak.items():
435            self.register_response(waiter_coro_id, instance)
436
437        self.make_dead()
438
439    def in_work(self) -> bool:
440        return self.thrifty_in_work(bool(self.results))
441    
442    def _on_start(self, host=None, port=None):
443        if State.stopped != self.state:
444            return True, None, RuntimeError('Already started or not yet stopped.')
445        else:
446            ...
447        
448        return True, None, None
449
450
451RemoteNodesRequest.default_service_type = RemoteNodes
class RemoteNodes(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.DualImmediateProcessingServiceMixin, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin):
 96class RemoteNodes(DualImmediateProcessingServiceMixin, Service, EntityStatsMixin):
 97    def __init__(self, loop: CoroSchedulerType):
 98        super(RemoteNodes, self).__init__(loop)
 99        self._request_workers = {
100            0: self._on_start,
101            1: self._on_stop,
102            2: self._on_connect,
103            2: self._on_disconnect,
104        }
105        self.platform_name: str = f'{PLATFORM_NAME}-{".".join(PYTHON_VERSION)}'
106        self.state: State = State.stopped
107        self.servers: Dict[Hashable, RemoteServer] = dict()
108        self.clients: Dict[Hashable, RemoteClient] = dict()
109
110        self.serializer__current_platform__custom_types: Serializer = best_serializer_for_standard_data((
111            DataFormats.any,
112            Tags.current_platform,
113            Tags.deep,
114            Tags.can_use_custom_types,
115            Tags.can_use_bytes,
116            Tags.can_use_set,
117            Tags.decode_str_as_str,
118            Tags.decode_bytes_as_bytes,
119            Tags.decode_tuple_as_tuple,
120            Tags.decode_list_as_list,
121        ), TestDataType.deep_large, 0.1)
122        print(self.serializer__current_platform__custom_types.serializer)
123
124        self.serializer__current_platform: Serializer = best_serializer_for_standard_data((
125            DataFormats.any,
126            Tags.deep,
127            Tags.can_use_set,
128            Tags.can_use_bytes,
129            Tags.decode_str_as_str,
130            Tags.decode_bytes_as_bytes,
131            Tags.decode_tuple_as_tuple,
132            Tags.decode_list_as_list,
133        ), TestDataType.deep_large, 0.1)
134        print(self.serializer__current_platform.serializer)
135
136        self.serializer__multi_platform: Serializer = best_serializer_for_standard_data((
137            DataFormats.any,
138            Tags.deep,
139            Tags.multi_platform,
140            Tags.can_use_bytes,
141            Tags.decode_str_as_str,
142            Tags.decode_list_as_list,
143        ), TestDataType.deep_large, 0.1)
144        print(self.serializer__multi_platform.serializer)
145
146        self.serializer__multi_platform_fast: Serializer = best_serializer_for_standard_data((
147            DataFormats.any,
148            Tags.deep,
149            Tags.multi_platform,
150            Tags.can_use_bytes,
151        ), TestDataType.small, 0.1)
152        print(self.serializer__multi_platform_fast.serializer)
153
154        self.serializer__multi_platform__initial_communication: Serializer = best_serializer_for_standard_data((
155            DataFormats.json,
156            Tags.deep,
157            Tags.multi_platform,
158        ), TestDataType.small, 0.1)
159        print(self.serializer__multi_platform_fast.serializer)
160    
161    def serialize(self, foreign_platform_name, data) -> Tuple[Serializer, SerializerID, bytes]:
162        """_summary_
163
164        Args:
165            foreign_platform_name (_type_): _description_
166            data (_type_): _description_
167
168        Raises:
169            MessageDataCanNotBeSerializedForRequestedNodeError: _description_
170
171        Returns:
172            Tuple[Serializer, SerializerID, bytes]: _description_
173        """        
174        current_platform = is_current_platform(self.platform_name, foreign_platform_name)
175        if current_platform:
176            serializer_id: SerializerID = SerializerID.current_platform
177            serializer: Serializer = self.serializer__current_platform
178        else:
179            serializer_id = SerializerID.multi_platform
180            serializer = self.serializer__multi_platform
181        
182        try:
183            return serializer, serializer_id, serializer.dumps(data)
184        except:
185            if not current_platform:
186                raise MessageDataCanNotBeSerializedForRequestedNodeError
187        
188        serializer_id = SerializerID.current_platform__custom_types
189        serializer = self.serializer__current_platform__custom_types
190        return serializer, serializer_id, serializer.dumps(data)
191    
192    def deserialize(self, serializer_id: int, data: bytes) -> Any:
193        """_summary_
194
195        Args:
196            serializer_id (int): _description_
197            data (bytes): _description_
198
199        Raises:
200            UnknownSerializerIDError: _description_
201
202        Returns:
203            Any: _description_
204        """        
205        if SerializerID.multi_platform_fast.value == serializer_id:
206            serializer: Serializer = self.serializer__multi_platform_fast
207        elif SerializerID.multi_platform.value == serializer_id:
208            serializer = self.serializer__multi_platform
209        elif SerializerID.current_platform.value == serializer_id:
210            serializer = self.serializer__current_platform
211        elif SerializerID.current_platform__custom_types.value == serializer_id:
212            serializer = self.serializer__current_platform__custom_types
213        else:
214            raise UnknownSerializerIDError(serializer_id)
215
216        return serializer.loads(data)
217    
218    def serialize_request(self, server_id: Hashable, args, kwargs) -> bytes:
219        """_summary_
220
221        Args:
222            server_id (Hashable): _description_
223            args (_type_): _description_
224            kwargs (_type_): _description_
225
226        Raises:
227            MessageCanNotBeEmptyError: _description_
228            RuntimeError: _description_
229            RuntimeError: _description_
230
231        Returns:
232            bytes: _description_
233        """        
234        result: bytes = None
235        remote_server: RemoteServer = self.servers[server_id]
236        remotely_registered_service_classes: Dict[Type, LocalClassInfo] = remote_server.remotely_registered_service_classes
237        remotely_registered_request_classes: Dict[Type, LocalRequestClassInfo] = remote_server.remotely_registered_request_classes
238        try:
239            is_raw_request: bool = False
240            service_type_param_name: str = 'service_type'
241            request_param_name: str  = 'request'
242            if 0 == number_of_provided_args(args, kwargs):
243                # an error
244                raise MessageCanNotBeEmptyError
245            elif 2 == number_of_provided_args(args, kwargs):
246                service_type: Type[Service] = None
247                request_obj: Request = None
248                params = {service_type_param_name: 0, request_param_name: 1}
249                service_type_param_found, service_type_param_pos, service_type_param_value = find_arg_position_and_value(service_type_param_name, params, args, kwargs)
250                if service_type_param_found:
251                    if isinstance(service_type_param_value, type) and issubclass(service_type_param_value, Service):
252                        service_type = service_type_param_value
253                    else:
254                        is_raw_request = True
255                else:
256                    is_raw_request = True
257                
258                if not is_raw_request:
259                    request_param_found, request_param_pos, request_param_value = find_arg_position_and_value(request_param_name, params, args, kwargs)
260                    if request_param_found:
261                        if isinstance(request_param_value, Request):
262                            request_obj = request_param_value
263                        else:
264                            is_raw_request = True
265                    else:
266                        is_raw_request = True
267                
268                if not is_raw_request:
269                    if (service_type is None) or (request_obj is None):
270                        raise RuntimeError  # executed only if there is some bug in code
271                    
272                    # check service existance
273                    new_service: bool = False
274                    if service_type not in remotely_registered_service_classes:
275                        new_service = True
276                        remote_server.register_service_class(service_type)
277
278                    local_service_class_info: LocalClassInfo = remotely_registered_service_classes[service_type]
279                    messages: List[bytes] = list()
280                    
281                    # register new service
282                    if new_service:
283                        sys_data: Dict = local_service_class_info()
284                        request: Dict = {
285                            Fields.command_name.value: Commands.declare_service_class.value,
286                            Fields.request_id.value: remote_server.gen_request_id(),
287                            Fields.data.value: sys_data,
288                        }
289                        serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request)
290                        messages.append(serialized_request)
291                    
292                    request_obj_type: Type[Request] = type(request_obj)
293
294                    # check request type existance
295                    new_request_type: bool = False
296                    if request_obj_type not in remotely_registered_request_classes:
297                        new_request_type = True
298                        remote_server.register_request_class(request_obj)
299
300                    local_request_class_info: LocalRequestClassInfo = remotely_registered_request_classes[request_obj_type]
301                    
302                    # register new request type
303                    if new_request_type:
304                        sys_data: Dict = local_request_class_info()
305                        request: Dict = {
306                            Fields.command_name.value: Commands.declare_service_request_class.value,
307                            Fields.request_id.value: remote_server.gen_request_id(),
308                            Fields.data.value: sys_data,
309                        }
310                        serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request)
311                        messages.append(serialized_request)
312
313                    # - actual request to service
314                    request_data: Dict = local_request_class_info.request_to_data(request_obj)
315                    request_data[CommandDataFieldsServiceRequestWithRequestClass.service_class_id.value] = local_service_class_info.local_id
316                    _, serializer_id, request_data = self.serialize(remote_server.foreign_platform_name, request_data)
317                    request: Dict = {
318                        Fields.command_name.value: Commands.send_service_request_with_request_class.value,
319                        Fields.request_id.value: remote_server.gen_request_id(),
320                        Fields.is_response_required.value: 1,
321                        Fields.data_serializer_id.value: serializer_id.value,
322                        Fields.data.value: request_data,
323                    }
324                    
325                    serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request)
326                    messages.append(serialized_request)
327                    result = b''.join(messages)
328            else:
329                params = {service_type_param_name: 0}
330                found, pos, value = find_arg_position_and_value(service_type_param_name, params, args, kwargs)
331                if found:
332                    if pos is None:
333                        del kwargs[service_type_param_name]
334                    else:
335                        args = args[1:]
336                    
337                    if isinstance(value, type) and issubclass(value, Service):
338                        service_type: Type[Service] = value
339
340                        # check service existance
341                        new_service: bool = False
342                        if service_type not in remotely_registered_service_classes:
343                            new_service = True
344                            remote_server.register_service_class(service_type)
345
346                        local_service_class_info: LocalClassInfo = remotely_registered_service_classes[service_type]
347                        
348                        messages: List[bytes] = list()
349                        
350                        # register new service
351                        if new_service:
352                            sys_data: Dict = local_service_class_info()
353                            request: Dict = {
354                                Fields.command_name.value: Commands.declare_service_class.value,
355                                Fields.request_id.value: remote_server.gen_request_id(),
356                                Fields.data.value: sys_data,
357                            }
358                            serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request)
359                            messages.append(serialized_request)
360
361                        # actual request to service
362                        request_data: Dict = {
363                            CommandDataFieldsServiceRequest.service_class_id.value: local_service_class_info.local_id,
364                        }
365                        explicit_serializer_required: bool = False
366                        if args:
367                            explicit_serializer_required = True
368                            request_data[CommandDataFieldsServiceRequest.args.value] = args
369                        
370                        if kwargs:
371                            explicit_serializer_required = True
372                            request_data[CommandDataFieldsServiceRequest.kwargs.value] = kwargs
373                        
374                        if explicit_serializer_required:
375                            _, serializer_id, request_data = self.serialize(remote_server.foreign_platform_name, request_data)
376                        
377                        request: Dict = {
378                            Fields.command_name.value: Commands.send_service_request.value,
379                            Fields.request_id.value: remote_server.gen_request_id(),
380                            Fields.is_response_required.value: 1,
381                            Fields.data.value: request_data,
382                        }
383                        if explicit_serializer_required:
384                            request[Fields.data_serializer_id.value] = serializer_id.value
385                        
386                        serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request)
387                        messages.append(serialized_request)
388                        result = b''.join(messages)
389                    else:
390                        is_raw_request = True
391                else:
392                    is_raw_request = True
393            
394            if is_raw_request:
395                # raw request
396                request_data: Dict = {
397                    CommandDataFieldsRequest.args.value: args,
398                    CommandDataFieldsRequest.kwargs.value: kwargs,
399                }
400                _, serializer_id, request_data = self.serialize(remote_server.foreign_platform_name, request_data)
401                
402                request: Dict = {
403                    Fields.command_name.value: Commands.send_request.value,
404                    Fields.request_id.value: remote_server.gen_request_id(),
405                    Fields.is_response_required.value: 1,
406                    Fields.data_serializer_id.value: serializer_id.value,
407                    Fields.data.value: request_data,
408                }
409                
410                result = self.serializer__multi_platform_fast.dumps(request)
411        except UnknownArgumentError:
412            raise RuntimeError  # executed only if there is some bug in code
413        
414        return result
415
416    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
417        return type(self).__name__, {
418            'instances num': len(self.instances),
419        }
420
421    def single_task_registration_or_immediate_processing_single(
422            self, *args, **kwargs
423    ) -> Tuple[bool, Optional[CoroID], Any]:
424        number_of_args: int = number_of_provided_args(args, kwargs)
425        if 1 == number_of_args:
426            return self._on_get(*args, **kwargs)
427        elif 2 == number_of_args:
428            return self._on_set(*args, **kwargs)
429        else:
430            return True, None, RuntimeError(f'Wrong number of parameters: {number_of_args}')
431
432    def full_processing_iteration(self):
433        results_bak = self.results
434        self.results = type(results_bak)()
435        for waiter_coro_id, instance in results_bak.items():
436            self.register_response(waiter_coro_id, instance)
437
438        self.make_dead()
439
440    def in_work(self) -> bool:
441        return self.thrifty_in_work(bool(self.results))
442    
443    def _on_start(self, host=None, port=None):
444        if State.stopped != self.state:
445            return True, None, RuntimeError('Already started or not yet stopped.')
446        else:
447            ...
448        
449        return True, None, None
RemoteNodes( loop: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
 97    def __init__(self, loop: CoroSchedulerType):
 98        super(RemoteNodes, self).__init__(loop)
 99        self._request_workers = {
100            0: self._on_start,
101            1: self._on_stop,
102            2: self._on_connect,
103            2: self._on_disconnect,
104        }
105        self.platform_name: str = f'{PLATFORM_NAME}-{".".join(PYTHON_VERSION)}'
106        self.state: State = State.stopped
107        self.servers: Dict[Hashable, RemoteServer] = dict()
108        self.clients: Dict[Hashable, RemoteClient] = dict()
109
110        self.serializer__current_platform__custom_types: Serializer = best_serializer_for_standard_data((
111            DataFormats.any,
112            Tags.current_platform,
113            Tags.deep,
114            Tags.can_use_custom_types,
115            Tags.can_use_bytes,
116            Tags.can_use_set,
117            Tags.decode_str_as_str,
118            Tags.decode_bytes_as_bytes,
119            Tags.decode_tuple_as_tuple,
120            Tags.decode_list_as_list,
121        ), TestDataType.deep_large, 0.1)
122        print(self.serializer__current_platform__custom_types.serializer)
123
124        self.serializer__current_platform: Serializer = best_serializer_for_standard_data((
125            DataFormats.any,
126            Tags.deep,
127            Tags.can_use_set,
128            Tags.can_use_bytes,
129            Tags.decode_str_as_str,
130            Tags.decode_bytes_as_bytes,
131            Tags.decode_tuple_as_tuple,
132            Tags.decode_list_as_list,
133        ), TestDataType.deep_large, 0.1)
134        print(self.serializer__current_platform.serializer)
135
136        self.serializer__multi_platform: Serializer = best_serializer_for_standard_data((
137            DataFormats.any,
138            Tags.deep,
139            Tags.multi_platform,
140            Tags.can_use_bytes,
141            Tags.decode_str_as_str,
142            Tags.decode_list_as_list,
143        ), TestDataType.deep_large, 0.1)
144        print(self.serializer__multi_platform.serializer)
145
146        self.serializer__multi_platform_fast: Serializer = best_serializer_for_standard_data((
147            DataFormats.any,
148            Tags.deep,
149            Tags.multi_platform,
150            Tags.can_use_bytes,
151        ), TestDataType.small, 0.1)
152        print(self.serializer__multi_platform_fast.serializer)
153
154        self.serializer__multi_platform__initial_communication: Serializer = best_serializer_for_standard_data((
155            DataFormats.json,
156            Tags.deep,
157            Tags.multi_platform,
158        ), TestDataType.small, 0.1)
159        print(self.serializer__multi_platform_fast.serializer)
platform_name: str
state: cengal.parallel_execution.coroutines.coro_standard_services.remote_nodes.versions.v_0.remote_nodes.State
servers: Dict[Hashable, cengal.parallel_execution.coroutines.coro_standard_services.remote_nodes.versions.v_0.remote_node.RemoteServer]
clients: Dict[Hashable, cengal.parallel_execution.coroutines.coro_standard_services.remote_nodes.versions.v_0.remote_node.RemoteClient]
serializer__current_platform__custom_types: cengal.data_manipulation.serialization.versions.v_0.serialization.Serializer
serializer__current_platform: cengal.data_manipulation.serialization.versions.v_0.serialization.Serializer
serializer__multi_platform: cengal.data_manipulation.serialization.versions.v_0.serialization.Serializer
serializer__multi_platform_fast: cengal.data_manipulation.serialization.versions.v_0.serialization.Serializer
serializer__multi_platform__initial_communication: cengal.data_manipulation.serialization.versions.v_0.serialization.Serializer
def serialize( self, foreign_platform_name, data) -> Tuple[cengal.data_manipulation.serialization.versions.v_0.serialization.Serializer, cengal.parallel_execution.coroutines.coro_standard_services.remote_nodes.versions.v_0.serializers.SerializerID, bytes]:
161    def serialize(self, foreign_platform_name, data) -> Tuple[Serializer, SerializerID, bytes]:
162        """_summary_
163
164        Args:
165            foreign_platform_name (_type_): _description_
166            data (_type_): _description_
167
168        Raises:
169            MessageDataCanNotBeSerializedForRequestedNodeError: _description_
170
171        Returns:
172            Tuple[Serializer, SerializerID, bytes]: _description_
173        """        
174        current_platform = is_current_platform(self.platform_name, foreign_platform_name)
175        if current_platform:
176            serializer_id: SerializerID = SerializerID.current_platform
177            serializer: Serializer = self.serializer__current_platform
178        else:
179            serializer_id = SerializerID.multi_platform
180            serializer = self.serializer__multi_platform
181        
182        try:
183            return serializer, serializer_id, serializer.dumps(data)
184        except:
185            if not current_platform:
186                raise MessageDataCanNotBeSerializedForRequestedNodeError
187        
188        serializer_id = SerializerID.current_platform__custom_types
189        serializer = self.serializer__current_platform__custom_types
190        return serializer, serializer_id, serializer.dumps(data)

_summary_

Args: foreign_platform_name (_type_): _description_ data (_type_): _description_

Raises: MessageDataCanNotBeSerializedForRequestedNodeError: _description_

Returns: Tuple[Serializer, SerializerID, bytes]: _description_

def deserialize(self, serializer_id: int, data: bytes) -> Any:
192    def deserialize(self, serializer_id: int, data: bytes) -> Any:
193        """_summary_
194
195        Args:
196            serializer_id (int): _description_
197            data (bytes): _description_
198
199        Raises:
200            UnknownSerializerIDError: _description_
201
202        Returns:
203            Any: _description_
204        """        
205        if SerializerID.multi_platform_fast.value == serializer_id:
206            serializer: Serializer = self.serializer__multi_platform_fast
207        elif SerializerID.multi_platform.value == serializer_id:
208            serializer = self.serializer__multi_platform
209        elif SerializerID.current_platform.value == serializer_id:
210            serializer = self.serializer__current_platform
211        elif SerializerID.current_platform__custom_types.value == serializer_id:
212            serializer = self.serializer__current_platform__custom_types
213        else:
214            raise UnknownSerializerIDError(serializer_id)
215
216        return serializer.loads(data)

_summary_

Args: serializer_id (int): _description_ data (bytes): _description_

Raises: UnknownSerializerIDError: _description_

Returns: Any: _description_

def serialize_request(self, server_id: typing.Hashable, args, kwargs) -> bytes:
218    def serialize_request(self, server_id: Hashable, args, kwargs) -> bytes:
219        """_summary_
220
221        Args:
222            server_id (Hashable): _description_
223            args (_type_): _description_
224            kwargs (_type_): _description_
225
226        Raises:
227            MessageCanNotBeEmptyError: _description_
228            RuntimeError: _description_
229            RuntimeError: _description_
230
231        Returns:
232            bytes: _description_
233        """        
234        result: bytes = None
235        remote_server: RemoteServer = self.servers[server_id]
236        remotely_registered_service_classes: Dict[Type, LocalClassInfo] = remote_server.remotely_registered_service_classes
237        remotely_registered_request_classes: Dict[Type, LocalRequestClassInfo] = remote_server.remotely_registered_request_classes
238        try:
239            is_raw_request: bool = False
240            service_type_param_name: str = 'service_type'
241            request_param_name: str  = 'request'
242            if 0 == number_of_provided_args(args, kwargs):
243                # an error
244                raise MessageCanNotBeEmptyError
245            elif 2 == number_of_provided_args(args, kwargs):
246                service_type: Type[Service] = None
247                request_obj: Request = None
248                params = {service_type_param_name: 0, request_param_name: 1}
249                service_type_param_found, service_type_param_pos, service_type_param_value = find_arg_position_and_value(service_type_param_name, params, args, kwargs)
250                if service_type_param_found:
251                    if isinstance(service_type_param_value, type) and issubclass(service_type_param_value, Service):
252                        service_type = service_type_param_value
253                    else:
254                        is_raw_request = True
255                else:
256                    is_raw_request = True
257                
258                if not is_raw_request:
259                    request_param_found, request_param_pos, request_param_value = find_arg_position_and_value(request_param_name, params, args, kwargs)
260                    if request_param_found:
261                        if isinstance(request_param_value, Request):
262                            request_obj = request_param_value
263                        else:
264                            is_raw_request = True
265                    else:
266                        is_raw_request = True
267                
268                if not is_raw_request:
269                    if (service_type is None) or (request_obj is None):
270                        raise RuntimeError  # executed only if there is some bug in code
271                    
272                    # check service existance
273                    new_service: bool = False
274                    if service_type not in remotely_registered_service_classes:
275                        new_service = True
276                        remote_server.register_service_class(service_type)
277
278                    local_service_class_info: LocalClassInfo = remotely_registered_service_classes[service_type]
279                    messages: List[bytes] = list()
280                    
281                    # register new service
282                    if new_service:
283                        sys_data: Dict = local_service_class_info()
284                        request: Dict = {
285                            Fields.command_name.value: Commands.declare_service_class.value,
286                            Fields.request_id.value: remote_server.gen_request_id(),
287                            Fields.data.value: sys_data,
288                        }
289                        serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request)
290                        messages.append(serialized_request)
291                    
292                    request_obj_type: Type[Request] = type(request_obj)
293
294                    # check request type existance
295                    new_request_type: bool = False
296                    if request_obj_type not in remotely_registered_request_classes:
297                        new_request_type = True
298                        remote_server.register_request_class(request_obj)
299
300                    local_request_class_info: LocalRequestClassInfo = remotely_registered_request_classes[request_obj_type]
301                    
302                    # register new request type
303                    if new_request_type:
304                        sys_data: Dict = local_request_class_info()
305                        request: Dict = {
306                            Fields.command_name.value: Commands.declare_service_request_class.value,
307                            Fields.request_id.value: remote_server.gen_request_id(),
308                            Fields.data.value: sys_data,
309                        }
310                        serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request)
311                        messages.append(serialized_request)
312
313                    # - actual request to service
314                    request_data: Dict = local_request_class_info.request_to_data(request_obj)
315                    request_data[CommandDataFieldsServiceRequestWithRequestClass.service_class_id.value] = local_service_class_info.local_id
316                    _, serializer_id, request_data = self.serialize(remote_server.foreign_platform_name, request_data)
317                    request: Dict = {
318                        Fields.command_name.value: Commands.send_service_request_with_request_class.value,
319                        Fields.request_id.value: remote_server.gen_request_id(),
320                        Fields.is_response_required.value: 1,
321                        Fields.data_serializer_id.value: serializer_id.value,
322                        Fields.data.value: request_data,
323                    }
324                    
325                    serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request)
326                    messages.append(serialized_request)
327                    result = b''.join(messages)
328            else:
329                params = {service_type_param_name: 0}
330                found, pos, value = find_arg_position_and_value(service_type_param_name, params, args, kwargs)
331                if found:
332                    if pos is None:
333                        del kwargs[service_type_param_name]
334                    else:
335                        args = args[1:]
336                    
337                    if isinstance(value, type) and issubclass(value, Service):
338                        service_type: Type[Service] = value
339
340                        # check service existance
341                        new_service: bool = False
342                        if service_type not in remotely_registered_service_classes:
343                            new_service = True
344                            remote_server.register_service_class(service_type)
345
346                        local_service_class_info: LocalClassInfo = remotely_registered_service_classes[service_type]
347                        
348                        messages: List[bytes] = list()
349                        
350                        # register new service
351                        if new_service:
352                            sys_data: Dict = local_service_class_info()
353                            request: Dict = {
354                                Fields.command_name.value: Commands.declare_service_class.value,
355                                Fields.request_id.value: remote_server.gen_request_id(),
356                                Fields.data.value: sys_data,
357                            }
358                            serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request)
359                            messages.append(serialized_request)
360
361                        # actual request to service
362                        request_data: Dict = {
363                            CommandDataFieldsServiceRequest.service_class_id.value: local_service_class_info.local_id,
364                        }
365                        explicit_serializer_required: bool = False
366                        if args:
367                            explicit_serializer_required = True
368                            request_data[CommandDataFieldsServiceRequest.args.value] = args
369                        
370                        if kwargs:
371                            explicit_serializer_required = True
372                            request_data[CommandDataFieldsServiceRequest.kwargs.value] = kwargs
373                        
374                        if explicit_serializer_required:
375                            _, serializer_id, request_data = self.serialize(remote_server.foreign_platform_name, request_data)
376                        
377                        request: Dict = {
378                            Fields.command_name.value: Commands.send_service_request.value,
379                            Fields.request_id.value: remote_server.gen_request_id(),
380                            Fields.is_response_required.value: 1,
381                            Fields.data.value: request_data,
382                        }
383                        if explicit_serializer_required:
384                            request[Fields.data_serializer_id.value] = serializer_id.value
385                        
386                        serialized_request: bytes = self.serializer__multi_platform_fast.dumps(request)
387                        messages.append(serialized_request)
388                        result = b''.join(messages)
389                    else:
390                        is_raw_request = True
391                else:
392                    is_raw_request = True
393            
394            if is_raw_request:
395                # raw request
396                request_data: Dict = {
397                    CommandDataFieldsRequest.args.value: args,
398                    CommandDataFieldsRequest.kwargs.value: kwargs,
399                }
400                _, serializer_id, request_data = self.serialize(remote_server.foreign_platform_name, request_data)
401                
402                request: Dict = {
403                    Fields.command_name.value: Commands.send_request.value,
404                    Fields.request_id.value: remote_server.gen_request_id(),
405                    Fields.is_response_required.value: 1,
406                    Fields.data_serializer_id.value: serializer_id.value,
407                    Fields.data.value: request_data,
408                }
409                
410                result = self.serializer__multi_platform_fast.dumps(request)
411        except UnknownArgumentError:
412            raise RuntimeError  # executed only if there is some bug in code
413        
414        return result

_summary_

Args: server_id (Hashable): _description_ args (_type_): _description_ kwargs (_type_): _description_

Raises: MessageCanNotBeEmptyError: _description_ RuntimeError: _description_ RuntimeError: _description_

Returns: bytes: _description_

def get_entity_stats( self, stats_level: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin.StatsLevel = <StatsLevel.debug: 1>) -> Tuple[str, Dict[str, Any]]:
416    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
417        return type(self).__name__, {
418            'instances num': len(self.instances),
419        }
def single_task_registration_or_immediate_processing_single(self, *args, **kwargs) -> Tuple[bool, Union[int, NoneType], Any]:
421    def single_task_registration_or_immediate_processing_single(
422            self, *args, **kwargs
423    ) -> Tuple[bool, Optional[CoroID], Any]:
424        number_of_args: int = number_of_provided_args(args, kwargs)
425        if 1 == number_of_args:
426            return self._on_get(*args, **kwargs)
427        elif 2 == number_of_args:
428            return self._on_set(*args, **kwargs)
429        else:
430            return True, None, RuntimeError(f'Wrong number of parameters: {number_of_args}')
def full_processing_iteration(self):
432    def full_processing_iteration(self):
433        results_bak = self.results
434        self.results = type(results_bak)()
435        for waiter_coro_id, instance in results_bak.items():
436            self.register_response(waiter_coro_id, instance)
437
438        self.make_dead()
def in_work(self) -> bool:
440    def in_work(self) -> bool:
441        return self.thrifty_in_work(bool(self.results))

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.DualImmediateProcessingServiceMixin
single_task_registration_or_immediate_processing
single_task_registration_or_immediate_processing_multiple
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
in_forground_work
thrifty_in_work
time_left_before_next_event
is_low_latency
make_live
make_dead
service_id_impl
service_id
destroy
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
StatsLevel
class RemoteNodesRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
74class RemoteNodesRequest(ServiceRequest):
75    def start(self, host=None, port=None, tls: bool = True, stream_manager: Optional[StreamManagerAbstract] = None) -> ServiceRequest:
76        return self._save(0, host, port)
77    def stop(self) -> ServiceRequest:
78        return self._save(1)
79    def connect(self, host=None, port=None, connection_id_alias=None, tls: bool = True, stream_manager: Optional[StreamManagerAbstract] = None) -> ServiceRequest:
80        return self._save(2, host, port, connection_id_alias)
81    def disconnect(self, connection_id) -> ServiceRequest:
82        return self._save(3, connection_id)
def start( self, host=None, port=None, tls: bool = True, stream_manager: typing.Union[cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamManagerAbstract, NoneType] = None) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
75    def start(self, host=None, port=None, tls: bool = True, stream_manager: Optional[StreamManagerAbstract] = None) -> ServiceRequest:
76        return self._save(0, host, port)
def stop( self) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
77    def stop(self) -> ServiceRequest:
78        return self._save(1)
def connect( self, host=None, port=None, connection_id_alias=None, tls: bool = True, stream_manager: typing.Union[cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamManagerAbstract, NoneType] = None) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
79    def connect(self, host=None, port=None, connection_id_alias=None, tls: bool = True, stream_manager: Optional[StreamManagerAbstract] = None) -> ServiceRequest:
80        return self._save(2, host, port, connection_id_alias)
def disconnect( self, connection_id) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
81    def disconnect(self, connection_id) -> ServiceRequest:
82        return self._save(3, connection_id)
default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] = <class 'RemoteNodes'>
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
default__request__type__
request_type
args
kwargs
provide_to_request_handler
interface
i
async_interface
ai