cengal.io.named_connections.workers.asyncio_streams.versions.v_0.asyncio_streams

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18"""
 19Module Docstring
 20Docstrings: http://www.python.org/dev/peps/pep-0257/
 21"""
 22
 23__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 24__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 25__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 26__license__ = "Apache License, Version 2.0"
 27__version__ = "4.4.1"
 28__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 29__email__ = "gtalk@butenkoms.space"
 30# __status__ = "Prototype"
 31__status__ = "Development"
 32# __status__ = "Production"
 33
 34
 35__all__ = ['ServerSideClient', 'create_server', 'start_server', 'ServerSideServer', 'CanNotEstablishNamedConnectionError', 'ClientSideClient', 'ClientSideServer']
 36
 37
 38from typing import Type
 39from cengal.parallel_execution.asyncio.efficient_streams import *
 40from cengal.io.named_connections.named_connections_manager import *
 41from cengal.parallel_execution.asyncio.atasks import create_task
 42from cengal.code_flow_control.smart_values import ValueCache, ValueExistence
 43from asyncio.tasks import sleep, Task
 44from asyncio.events import AbstractServer
 45
 46
 47class ServerSideClient(NamedConnectionsClient):
 48    def __init__(self, reader: StreamReaderAbstract, writer: StreamWriterAbstract, server_id: bytes, external_data_full_size: ValueExistence) -> None:
 49        super().__init__(server_id, external_data_full_size)
 50        self.reader: StreamReaderAbstract = reader
 51        self.writer: StreamWriterAbstract = writer
 52    
 53    async def serve(self):
 54        """client_connected_cb
 55
 56        Returns:
 57            Any: _description_
 58        """        
 59        raise NotImplementedError
 60    
 61    # async def server(reader, writer):
 62    #     reader: StreamReaderAbstract = reader
 63    #     writer: StreamWriterAbstract = writer
 64
 65    #     data = await reader.read(100)
 66    #     message = data.decode()
 67    #     addr = writer.get_extra_info('peername')
 68
 69    #     print(f"Received {message!r} from {addr!r}")
 70
 71    #     print(f"Send: {message!r}")
 72    #     # for i in range(100):
 73    #     #     writable_data = data * 1000000
 74    #     #     # print(f'writable_data len: {len(writable_data)}')
 75    #     #     writer.write(writable_data)
 76
 77    #     data_chunk_len = int(cpu_info().l2_cache_size_per_virtual_core / len(data))
 78    #     writer.start_aw()
 79    #     stime = perf_counter()
 80    #     dtime = 0
 81    #     return_time = 10
 82    #     index = 0
 83    #     while dtime < return_time:
 84    #         index += 1
 85    #         if 10 <= index:
 86    #             writer.owrite(pickle.dumps(PickleEncodableClass()))
 87    #             index = 0
 88    #         else:
 89    #             writer.owrite(marshal.dumps(data * randomized_data_size(data_chunk_len)))
 90
 91    #         await writer.aw_drain_enough()
 92    #         dtime = perf_counter() - stime
 93        
 94    #     await writer.full_drain()
 95
 96    #     print("Close the connection")
 97    #     writer.close()
 98
 99
100async def create_server(server_side_client_type: Type[ServerSideClient], named_connections_manager: NamedConnectionsManager, stream_manager: StreamManagerAbstract, *args, **kwargs):
101    async def client_connected_cb_wrapper(reader: StreamReaderAbstract, writer: StreamWriterAbstract):
102        if not stream_manager.try_establish_message_protocol_server_side(reader, writer):
103            raise CanNotEstablishNamedConnectionError
104        
105        server_id: bytes = await reader.read_message()
106        named_connection_client: ServerSideClient = server_side_client_type(reader, writer, server_id, named_connections_manager.external_data_full_size)
107        named_connections_manager.register_client(named_connection_client)
108        await named_connection_client.serve()
109
110    server: AbstractServer = stream_manager.start_server(client_connected_cb_wrapper, *args, **kwargs)
111    return server
112
113
114async def start_server(server: AbstractServer):
115    async with server:
116        await server.serve_forever()
117
118
119ServerSideServer = NamedConnectionsServer
120
121
122# ======================================================
123
124
125class CanNotEstablishNamedConnectionError(Exception):
126    pass
127
128
129ClientSideClient = NamedConnectionsClient
130
131
132class ClientSideServer(NamedConnectionsServer):
133    def __init__(self, server_id: bytes, named_connections_manager: 'NamedConnectionsManager', stream_manager: StreamManagerAbstract, need_to_full_drain_on_connect: bool, *connection_args, **connection_kwargs) -> None:
134        super().__init__(server_id, named_connections_manager)
135        self._connected: bool = False
136        self._stream_manager: StreamManagerAbstract = stream_manager
137        self._need_to_full_drain_on_connect: bool = need_to_full_drain_on_connect
138        self._connection_args = connection_args
139        self._connection_kwargs = connection_kwargs
140        self.autonomous_serving_future: Task = None
141        self.reader: StreamReaderAbstract = None
142        self.writer: StreamWriterAbstract = None
143    
144    async def serve(self):
145        """Serve
146
147        Returns:
148            Any: _description_
149        """        
150        raise NotImplementedError
151    
152    async def connect(self):
153        if self._connected:
154            return
155        
156        reader, writer = self._stream_manager.open_connection(*self._connection_args, **self._connection_kwargs)
157        self.reader = reader
158        self.writer = writer
159        if not self._stream_manager.try_establish_message_protocol_client_side(reader, writer):
160            raise CanNotEstablishNamedConnectionError
161        
162        writer.optimized_write_message(self.id)
163        if self._need_to_full_drain_on_connect:
164            await writer.full_drain()
165        
166        self.autonomous_serving_future = create_task(self.serve)
167        self._connected = True
168    
169    async def create_client(self, client_side_client_type: Type[ClientSideClient]):
170        if not self._connected:
171            await self.connect()
172        
173        return self.named_connections_manager().create_client(client_side_client_type, self.id)
class ServerSideClient(cengal.io.named_connections.named_connections_manager.versions.v_0.named_connections_manager.NamedConnectionsClient):
48class ServerSideClient(NamedConnectionsClient):
49    def __init__(self, reader: StreamReaderAbstract, writer: StreamWriterAbstract, server_id: bytes, external_data_full_size: ValueExistence) -> None:
50        super().__init__(server_id, external_data_full_size)
51        self.reader: StreamReaderAbstract = reader
52        self.writer: StreamWriterAbstract = writer
53    
54    async def serve(self):
55        """client_connected_cb
56
57        Returns:
58            Any: _description_
59        """        
60        raise NotImplementedError
61    
62    # async def server(reader, writer):
63    #     reader: StreamReaderAbstract = reader
64    #     writer: StreamWriterAbstract = writer
65
66    #     data = await reader.read(100)
67    #     message = data.decode()
68    #     addr = writer.get_extra_info('peername')
69
70    #     print(f"Received {message!r} from {addr!r}")
71
72    #     print(f"Send: {message!r}")
73    #     # for i in range(100):
74    #     #     writable_data = data * 1000000
75    #     #     # print(f'writable_data len: {len(writable_data)}')
76    #     #     writer.write(writable_data)
77
78    #     data_chunk_len = int(cpu_info().l2_cache_size_per_virtual_core / len(data))
79    #     writer.start_aw()
80    #     stime = perf_counter()
81    #     dtime = 0
82    #     return_time = 10
83    #     index = 0
84    #     while dtime < return_time:
85    #         index += 1
86    #         if 10 <= index:
87    #             writer.owrite(pickle.dumps(PickleEncodableClass()))
88    #             index = 0
89    #         else:
90    #             writer.owrite(marshal.dumps(data * randomized_data_size(data_chunk_len)))
91
92    #         await writer.aw_drain_enough()
93    #         dtime = perf_counter() - stime
94        
95    #     await writer.full_drain()
96
97    #     print("Close the connection")
98    #     writer.close()
ServerSideClient( reader: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamReaderAbstract, writer: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamWriterAbstract, server_id: bytes, external_data_full_size: cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence)
49    def __init__(self, reader: StreamReaderAbstract, writer: StreamWriterAbstract, server_id: bytes, external_data_full_size: ValueExistence) -> None:
50        super().__init__(server_id, external_data_full_size)
51        self.reader: StreamReaderAbstract = reader
52        self.writer: StreamWriterAbstract = writer
reader: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamReaderAbstract
writer: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamWriterAbstract
async def serve(self):
54    async def serve(self):
55        """client_connected_cb
56
57        Returns:
58            Any: _description_
59        """        
60        raise NotImplementedError

client_connected_cb

Returns: Any: _description_

Inherited Members
cengal.io.named_connections.named_connections_manager.versions.v_0.named_connections_manager.NamedConnectionsClient
id
server_id
server_ref
input_from_client
output_to_client
callback__bind
callback__unbind
callback__data_to_client_added
callback__server_ready_for_data
callback__is_client_ready_for_data
callback__stop
data_to_server_added
is_server_ready_for_data
client_ready_for_data
stop
async def create_server( server_side_client_type: typing.Type[ServerSideClient], named_connections_manager: cengal.io.named_connections.named_connections_manager.versions.v_0.named_connections_manager.NamedConnectionsManager, stream_manager: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamManagerAbstract, *args, **kwargs):
101async def create_server(server_side_client_type: Type[ServerSideClient], named_connections_manager: NamedConnectionsManager, stream_manager: StreamManagerAbstract, *args, **kwargs):
102    async def client_connected_cb_wrapper(reader: StreamReaderAbstract, writer: StreamWriterAbstract):
103        if not stream_manager.try_establish_message_protocol_server_side(reader, writer):
104            raise CanNotEstablishNamedConnectionError
105        
106        server_id: bytes = await reader.read_message()
107        named_connection_client: ServerSideClient = server_side_client_type(reader, writer, server_id, named_connections_manager.external_data_full_size)
108        named_connections_manager.register_client(named_connection_client)
109        await named_connection_client.serve()
110
111    server: AbstractServer = stream_manager.start_server(client_connected_cb_wrapper, *args, **kwargs)
112    return server
async def start_server(server: asyncio.events.AbstractServer):
115async def start_server(server: AbstractServer):
116    async with server:
117        await server.serve_forever()
ServerSideServer = <class 'cengal.io.named_connections.named_connections_manager.versions.v_0.named_connections_manager.NamedConnectionsServer'>
class CanNotEstablishNamedConnectionError(builtins.Exception):
126class CanNotEstablishNamedConnectionError(Exception):
127    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
ClientSideClient = <class 'cengal.io.named_connections.named_connections_manager.versions.v_0.named_connections_manager.NamedConnectionsClient'>
class ClientSideServer(cengal.io.named_connections.named_connections_manager.versions.v_0.named_connections_manager.NamedConnectionsServer):
133class ClientSideServer(NamedConnectionsServer):
134    def __init__(self, server_id: bytes, named_connections_manager: 'NamedConnectionsManager', stream_manager: StreamManagerAbstract, need_to_full_drain_on_connect: bool, *connection_args, **connection_kwargs) -> None:
135        super().__init__(server_id, named_connections_manager)
136        self._connected: bool = False
137        self._stream_manager: StreamManagerAbstract = stream_manager
138        self._need_to_full_drain_on_connect: bool = need_to_full_drain_on_connect
139        self._connection_args = connection_args
140        self._connection_kwargs = connection_kwargs
141        self.autonomous_serving_future: Task = None
142        self.reader: StreamReaderAbstract = None
143        self.writer: StreamWriterAbstract = None
144    
145    async def serve(self):
146        """Serve
147
148        Returns:
149            Any: _description_
150        """        
151        raise NotImplementedError
152    
153    async def connect(self):
154        if self._connected:
155            return
156        
157        reader, writer = self._stream_manager.open_connection(*self._connection_args, **self._connection_kwargs)
158        self.reader = reader
159        self.writer = writer
160        if not self._stream_manager.try_establish_message_protocol_client_side(reader, writer):
161            raise CanNotEstablishNamedConnectionError
162        
163        writer.optimized_write_message(self.id)
164        if self._need_to_full_drain_on_connect:
165            await writer.full_drain()
166        
167        self.autonomous_serving_future = create_task(self.serve)
168        self._connected = True
169    
170    async def create_client(self, client_side_client_type: Type[ClientSideClient]):
171        if not self._connected:
172            await self.connect()
173        
174        return self.named_connections_manager().create_client(client_side_client_type, self.id)
ClientSideServer( server_id: bytes, named_connections_manager: cengal.io.named_connections.named_connections_manager.versions.v_0.named_connections_manager.NamedConnectionsManager, stream_manager: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamManagerAbstract, need_to_full_drain_on_connect: bool, *connection_args, **connection_kwargs)
134    def __init__(self, server_id: bytes, named_connections_manager: 'NamedConnectionsManager', stream_manager: StreamManagerAbstract, need_to_full_drain_on_connect: bool, *connection_args, **connection_kwargs) -> None:
135        super().__init__(server_id, named_connections_manager)
136        self._connected: bool = False
137        self._stream_manager: StreamManagerAbstract = stream_manager
138        self._need_to_full_drain_on_connect: bool = need_to_full_drain_on_connect
139        self._connection_args = connection_args
140        self._connection_kwargs = connection_kwargs
141        self.autonomous_serving_future: Task = None
142        self.reader: StreamReaderAbstract = None
143        self.writer: StreamWriterAbstract = None
autonomous_serving_future: _asyncio.Task
reader: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamReaderAbstract
writer: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamWriterAbstract
async def serve(self):
145    async def serve(self):
146        """Serve
147
148        Returns:
149            Any: _description_
150        """        
151        raise NotImplementedError

Serve

Returns: Any: _description_

async def connect(self):
153    async def connect(self):
154        if self._connected:
155            return
156        
157        reader, writer = self._stream_manager.open_connection(*self._connection_args, **self._connection_kwargs)
158        self.reader = reader
159        self.writer = writer
160        if not self._stream_manager.try_establish_message_protocol_client_side(reader, writer):
161            raise CanNotEstablishNamedConnectionError
162        
163        writer.optimized_write_message(self.id)
164        if self._need_to_full_drain_on_connect:
165            await writer.full_drain()
166        
167        self.autonomous_serving_future = create_task(self.serve)
168        self._connected = True
async def create_client( self, client_side_client_type: typing.Type[cengal.io.named_connections.named_connections_manager.versions.v_0.named_connections_manager.NamedConnectionsClient]):
170    async def create_client(self, client_side_client_type: Type[ClientSideClient]):
171        if not self._connected:
172            await self.connect()
173        
174        return self.named_connections_manager().create_client(client_side_client_type, self.id)
Inherited Members
cengal.io.named_connections.named_connections_manager.versions.v_0.named_connections_manager.NamedConnectionsServer
id
named_connections_manager
bind_clients
bind
unbind
callback__data_to_server_added
callback__is_server_ready_for_data
callback__client_ready_for_data
callback__client_stopped