cengal.io.named_connections.workers.asyncio_streams.versions.v_0.asyncio_streams
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18""" 19Module Docstring 20Docstrings: http://www.python.org/dev/peps/pep-0257/ 21""" 22 23__author__ = "ButenkoMS <gtalk@butenkoms.space>" 24__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 25__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 26__license__ = "Apache License, Version 2.0" 27__version__ = "4.4.1" 28__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 29__email__ = "gtalk@butenkoms.space" 30# __status__ = "Prototype" 31__status__ = "Development" 32# __status__ = "Production" 33 34 35__all__ = ['ServerSideClient', 'create_server', 'start_server', 'ServerSideServer', 'CanNotEstablishNamedConnectionError', 'ClientSideClient', 'ClientSideServer'] 36 37 38from typing import Type 39from cengal.parallel_execution.asyncio.efficient_streams import * 40from cengal.io.named_connections.named_connections_manager import * 41from cengal.parallel_execution.asyncio.atasks import create_task 42from cengal.code_flow_control.smart_values import ValueCache, ValueExistence 43from asyncio.tasks import sleep, Task 44from asyncio.events import AbstractServer 45 46 47class ServerSideClient(NamedConnectionsClient): 48 def __init__(self, reader: StreamReaderAbstract, writer: StreamWriterAbstract, server_id: bytes, external_data_full_size: ValueExistence) -> None: 49 super().__init__(server_id, external_data_full_size) 50 self.reader: StreamReaderAbstract = reader 51 self.writer: StreamWriterAbstract = writer 52 53 async def serve(self): 54 """client_connected_cb 55 56 Returns: 57 Any: _description_ 58 """ 59 raise NotImplementedError 60 61 # async def server(reader, writer): 62 # reader: StreamReaderAbstract = reader 63 # writer: StreamWriterAbstract = writer 64 65 # data = await reader.read(100) 66 # message = data.decode() 67 # addr = writer.get_extra_info('peername') 68 69 # print(f"Received {message!r} from {addr!r}") 70 71 # print(f"Send: {message!r}") 72 # # for i in range(100): 73 # # writable_data = data * 1000000 74 # # # print(f'writable_data len: {len(writable_data)}') 75 # # writer.write(writable_data) 76 77 # data_chunk_len = int(cpu_info().l2_cache_size_per_virtual_core / len(data)) 78 # writer.start_aw() 79 # stime = perf_counter() 80 # dtime = 0 81 # return_time = 10 82 # index = 0 83 # while dtime < return_time: 84 # index += 1 85 # if 10 <= index: 86 # writer.owrite(pickle.dumps(PickleEncodableClass())) 87 # index = 0 88 # else: 89 # writer.owrite(marshal.dumps(data * randomized_data_size(data_chunk_len))) 90 91 # await writer.aw_drain_enough() 92 # dtime = perf_counter() - stime 93 94 # await writer.full_drain() 95 96 # print("Close the connection") 97 # writer.close() 98 99 100async def create_server(server_side_client_type: Type[ServerSideClient], named_connections_manager: NamedConnectionsManager, stream_manager: StreamManagerAbstract, *args, **kwargs): 101 async def client_connected_cb_wrapper(reader: StreamReaderAbstract, writer: StreamWriterAbstract): 102 if not stream_manager.try_establish_message_protocol_server_side(reader, writer): 103 raise CanNotEstablishNamedConnectionError 104 105 server_id: bytes = await reader.read_message() 106 named_connection_client: ServerSideClient = server_side_client_type(reader, writer, server_id, named_connections_manager.external_data_full_size) 107 named_connections_manager.register_client(named_connection_client) 108 await named_connection_client.serve() 109 110 server: AbstractServer = stream_manager.start_server(client_connected_cb_wrapper, *args, **kwargs) 111 return server 112 113 114async def start_server(server: AbstractServer): 115 async with server: 116 await server.serve_forever() 117 118 119ServerSideServer = NamedConnectionsServer 120 121 122# ====================================================== 123 124 125class CanNotEstablishNamedConnectionError(Exception): 126 pass 127 128 129ClientSideClient = NamedConnectionsClient 130 131 132class ClientSideServer(NamedConnectionsServer): 133 def __init__(self, server_id: bytes, named_connections_manager: 'NamedConnectionsManager', stream_manager: StreamManagerAbstract, need_to_full_drain_on_connect: bool, *connection_args, **connection_kwargs) -> None: 134 super().__init__(server_id, named_connections_manager) 135 self._connected: bool = False 136 self._stream_manager: StreamManagerAbstract = stream_manager 137 self._need_to_full_drain_on_connect: bool = need_to_full_drain_on_connect 138 self._connection_args = connection_args 139 self._connection_kwargs = connection_kwargs 140 self.autonomous_serving_future: Task = None 141 self.reader: StreamReaderAbstract = None 142 self.writer: StreamWriterAbstract = None 143 144 async def serve(self): 145 """Serve 146 147 Returns: 148 Any: _description_ 149 """ 150 raise NotImplementedError 151 152 async def connect(self): 153 if self._connected: 154 return 155 156 reader, writer = self._stream_manager.open_connection(*self._connection_args, **self._connection_kwargs) 157 self.reader = reader 158 self.writer = writer 159 if not self._stream_manager.try_establish_message_protocol_client_side(reader, writer): 160 raise CanNotEstablishNamedConnectionError 161 162 writer.optimized_write_message(self.id) 163 if self._need_to_full_drain_on_connect: 164 await writer.full_drain() 165 166 self.autonomous_serving_future = create_task(self.serve) 167 self._connected = True 168 169 async def create_client(self, client_side_client_type: Type[ClientSideClient]): 170 if not self._connected: 171 await self.connect() 172 173 return self.named_connections_manager().create_client(client_side_client_type, self.id)
class
ServerSideClient(cengal.io.named_connections.named_connections_manager.versions.v_0.named_connections_manager.NamedConnectionsClient):
48class ServerSideClient(NamedConnectionsClient): 49 def __init__(self, reader: StreamReaderAbstract, writer: StreamWriterAbstract, server_id: bytes, external_data_full_size: ValueExistence) -> None: 50 super().__init__(server_id, external_data_full_size) 51 self.reader: StreamReaderAbstract = reader 52 self.writer: StreamWriterAbstract = writer 53 54 async def serve(self): 55 """client_connected_cb 56 57 Returns: 58 Any: _description_ 59 """ 60 raise NotImplementedError 61 62 # async def server(reader, writer): 63 # reader: StreamReaderAbstract = reader 64 # writer: StreamWriterAbstract = writer 65 66 # data = await reader.read(100) 67 # message = data.decode() 68 # addr = writer.get_extra_info('peername') 69 70 # print(f"Received {message!r} from {addr!r}") 71 72 # print(f"Send: {message!r}") 73 # # for i in range(100): 74 # # writable_data = data * 1000000 75 # # # print(f'writable_data len: {len(writable_data)}') 76 # # writer.write(writable_data) 77 78 # data_chunk_len = int(cpu_info().l2_cache_size_per_virtual_core / len(data)) 79 # writer.start_aw() 80 # stime = perf_counter() 81 # dtime = 0 82 # return_time = 10 83 # index = 0 84 # while dtime < return_time: 85 # index += 1 86 # if 10 <= index: 87 # writer.owrite(pickle.dumps(PickleEncodableClass())) 88 # index = 0 89 # else: 90 # writer.owrite(marshal.dumps(data * randomized_data_size(data_chunk_len))) 91 92 # await writer.aw_drain_enough() 93 # dtime = perf_counter() - stime 94 95 # await writer.full_drain() 96 97 # print("Close the connection") 98 # writer.close()
ServerSideClient( reader: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamReaderAbstract, writer: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamWriterAbstract, server_id: bytes, external_data_full_size: cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence)
reader: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamReaderAbstract
writer: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamWriterAbstract
async def
serve(self):
54 async def serve(self): 55 """client_connected_cb 56 57 Returns: 58 Any: _description_ 59 """ 60 raise NotImplementedError
client_connected_cb
Returns: Any: _description_
Inherited Members
- cengal.io.named_connections.named_connections_manager.versions.v_0.named_connections_manager.NamedConnectionsClient
- id
- server_id
- server_ref
- input_from_client
- output_to_client
- callback__bind
- callback__unbind
- callback__data_to_client_added
- callback__server_ready_for_data
- callback__is_client_ready_for_data
- callback__stop
- data_to_server_added
- is_server_ready_for_data
- client_ready_for_data
- stop
async def
create_server( server_side_client_type: typing.Type[ServerSideClient], named_connections_manager: cengal.io.named_connections.named_connections_manager.versions.v_0.named_connections_manager.NamedConnectionsManager, stream_manager: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamManagerAbstract, *args, **kwargs):
101async def create_server(server_side_client_type: Type[ServerSideClient], named_connections_manager: NamedConnectionsManager, stream_manager: StreamManagerAbstract, *args, **kwargs): 102 async def client_connected_cb_wrapper(reader: StreamReaderAbstract, writer: StreamWriterAbstract): 103 if not stream_manager.try_establish_message_protocol_server_side(reader, writer): 104 raise CanNotEstablishNamedConnectionError 105 106 server_id: bytes = await reader.read_message() 107 named_connection_client: ServerSideClient = server_side_client_type(reader, writer, server_id, named_connections_manager.external_data_full_size) 108 named_connections_manager.register_client(named_connection_client) 109 await named_connection_client.serve() 110 111 server: AbstractServer = stream_manager.start_server(client_connected_cb_wrapper, *args, **kwargs) 112 return server
async def
start_server(server: asyncio.events.AbstractServer):
ServerSideServer =
<class 'cengal.io.named_connections.named_connections_manager.versions.v_0.named_connections_manager.NamedConnectionsServer'>
class
CanNotEstablishNamedConnectionError(builtins.Exception):
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
ClientSideClient =
<class 'cengal.io.named_connections.named_connections_manager.versions.v_0.named_connections_manager.NamedConnectionsClient'>
class
ClientSideServer(cengal.io.named_connections.named_connections_manager.versions.v_0.named_connections_manager.NamedConnectionsServer):
133class ClientSideServer(NamedConnectionsServer): 134 def __init__(self, server_id: bytes, named_connections_manager: 'NamedConnectionsManager', stream_manager: StreamManagerAbstract, need_to_full_drain_on_connect: bool, *connection_args, **connection_kwargs) -> None: 135 super().__init__(server_id, named_connections_manager) 136 self._connected: bool = False 137 self._stream_manager: StreamManagerAbstract = stream_manager 138 self._need_to_full_drain_on_connect: bool = need_to_full_drain_on_connect 139 self._connection_args = connection_args 140 self._connection_kwargs = connection_kwargs 141 self.autonomous_serving_future: Task = None 142 self.reader: StreamReaderAbstract = None 143 self.writer: StreamWriterAbstract = None 144 145 async def serve(self): 146 """Serve 147 148 Returns: 149 Any: _description_ 150 """ 151 raise NotImplementedError 152 153 async def connect(self): 154 if self._connected: 155 return 156 157 reader, writer = self._stream_manager.open_connection(*self._connection_args, **self._connection_kwargs) 158 self.reader = reader 159 self.writer = writer 160 if not self._stream_manager.try_establish_message_protocol_client_side(reader, writer): 161 raise CanNotEstablishNamedConnectionError 162 163 writer.optimized_write_message(self.id) 164 if self._need_to_full_drain_on_connect: 165 await writer.full_drain() 166 167 self.autonomous_serving_future = create_task(self.serve) 168 self._connected = True 169 170 async def create_client(self, client_side_client_type: Type[ClientSideClient]): 171 if not self._connected: 172 await self.connect() 173 174 return self.named_connections_manager().create_client(client_side_client_type, self.id)
ClientSideServer( server_id: bytes, named_connections_manager: cengal.io.named_connections.named_connections_manager.versions.v_0.named_connections_manager.NamedConnectionsManager, stream_manager: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamManagerAbstract, need_to_full_drain_on_connect: bool, *connection_args, **connection_kwargs)
134 def __init__(self, server_id: bytes, named_connections_manager: 'NamedConnectionsManager', stream_manager: StreamManagerAbstract, need_to_full_drain_on_connect: bool, *connection_args, **connection_kwargs) -> None: 135 super().__init__(server_id, named_connections_manager) 136 self._connected: bool = False 137 self._stream_manager: StreamManagerAbstract = stream_manager 138 self._need_to_full_drain_on_connect: bool = need_to_full_drain_on_connect 139 self._connection_args = connection_args 140 self._connection_kwargs = connection_kwargs 141 self.autonomous_serving_future: Task = None 142 self.reader: StreamReaderAbstract = None 143 self.writer: StreamWriterAbstract = None
reader: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamReaderAbstract
writer: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamWriterAbstract
async def
serve(self):
145 async def serve(self): 146 """Serve 147 148 Returns: 149 Any: _description_ 150 """ 151 raise NotImplementedError
Serve
Returns: Any: _description_
async def
connect(self):
153 async def connect(self): 154 if self._connected: 155 return 156 157 reader, writer = self._stream_manager.open_connection(*self._connection_args, **self._connection_kwargs) 158 self.reader = reader 159 self.writer = writer 160 if not self._stream_manager.try_establish_message_protocol_client_side(reader, writer): 161 raise CanNotEstablishNamedConnectionError 162 163 writer.optimized_write_message(self.id) 164 if self._need_to_full_drain_on_connect: 165 await writer.full_drain() 166 167 self.autonomous_serving_future = create_task(self.serve) 168 self._connected = True
async def
create_client( self, client_side_client_type: typing.Type[cengal.io.named_connections.named_connections_manager.versions.v_0.named_connections_manager.NamedConnectionsClient]):
Inherited Members
- cengal.io.named_connections.named_connections_manager.versions.v_0.named_connections_manager.NamedConnectionsServer
- id
- named_connections_manager
- bind_clients
- bind
- unbind
- callback__data_to_server_added
- callback__is_server_ready_for_data
- callback__client_ready_for_data
- callback__client_stopped