cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.tcp_efficient_streams

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

   1#!/usr/bin/env python
   2# coding=utf-8
   3
   4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
   5# 
   6# Licensed under the Apache License, Version 2.0 (the "License");
   7# you may not use this file except in compliance with the License.
   8# You may obtain a copy of the License at
   9# 
  10#     http://www.apache.org/licenses/LICENSE-2.0
  11# 
  12# Unless required by applicable law or agreed to in writing, software
  13# distributed under the License is distributed on an "AS IS" BASIS,
  14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15# See the License for the specific language governing permissions and
  16# limitations under the License.
  17
  18
  19"""
  20Module Docstring
  21Docstrings: http://www.python.org/dev/peps/pep-0257/
  22"""
  23
  24
  25__author__ = "ButenkoMS <gtalk@butenkoms.space>"
  26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
  27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
  28__license__ = "Apache License, Version 2.0"
  29__version__ = "4.4.1"
  30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
  31__email__ = "gtalk@butenkoms.space"
  32# __status__ = "Prototype"
  33__status__ = "Development"
  34# __status__ = "Production"
  35
  36
  37__all__ = ['StreamType', 'GateSecurityPolicy', 'StreamManagerIOCoreMemoryManagement', 'TcpStreamManager', 'TcpStreamReader', 'TcpStreamReaderProtocol', 'TcpStreamWriter']
  38
  39
  40import warnings
  41import asyncio
  42from asyncio.exceptions import IncompleteReadError, LimitOverrunError
  43from asyncio import streams
  44from asyncio.streams import StreamWriter as OriginalStreamWriter
  45from asyncio.streams import StreamReader as OriginalStreamReader
  46from asyncio.streams import StreamReaderProtocol as OriginalStreamReaderProtocol
  47from asyncio.sslproto import SSLProtocol, _SSLProtocolTransport
  48from asyncio.proactor_events import _ProactorSocketTransport
  49from asyncio.selector_events import _SelectorSocketTransport
  50from asyncio import events
  51from asyncio import proactor_events
  52from asyncio import selector_events
  53try:
  54    from asyncio import unix_events
  55except ImportError:
  56    pass
  57
  58from asyncio import coroutines
  59from asyncio.tasks import sleep, Task
  60from asyncio.futures import Future
  61from copy import copy
  62from enum import Enum
  63from cengal.io.asock_io.versions.v_1.recv_buff_size_computer.recv_buff_size_computer__python import RecvBuffSizeComputer
  64# from cengal.io.asock_io.versions.v_1.base import IOCoreMemoryManagement
  65from cengal.parallel_execution.asyncio.atasks import create_task
  66from cengal.parallel_execution.asyncio.timed_yield import TimedYield
  67from cengal.hardware.info.cpu import cpu_info
  68# from cengal.data_containers.dynamic_list_of_pieces import DynamicListOfPiecesDequeWithLengthControl
  69# from cengal.data_containers.fast_fifo import FIFODequeWithLengthControl, FIFOIsEmpty
  70# from cengal.data_manipulation.front_triggerable_variable import FrontTriggerableVariable, FrontTriggerableVariableType
  71from cengal.data_containers.dynamic_list_of_pieces.versions.v_1.dynamic_list_of_pieces__python import DynamicListOfPiecesDequeWithLengthControl
  72from cengal.code_flow_control.smart_values.versions import ValueExistence
  73from cengal.data_manipulation.conversion.reinterpret_cast import reinterpret_cast
  74from typing import Sequence, Tuple, Type, Set, Optional, Union, List, cast
  75from .efficient_streams_base_internal import *
  76from .efficient_streams_base import *
  77from .efficient_streams_abstract import *
  78
  79from contextlib import asynccontextmanager
  80
  81
  82class TcpStreamManager(StreamManagerAbstract):
  83    def __init__(self) -> None:
  84        self.io_memory_management: StreamManagerIOCoreMemoryManagement = StreamManagerIOCoreMemoryManagement()
  85        self.autonomous_writer_stop_default_timeout: Optional[Union[int, float]] = 10.0
  86        self.output_to_client_container_type = DynamicListOfPiecesDequeWithLengthControl
  87        self.input_from_client_container_type = DynamicListOfPiecesDequeWithLengthControl
  88
  89    async def open_connection(self, host=None, port=None, *,
  90                            loop=None, limit=DEFAULT_LIMIT,
  91                            stream_type: StreamType = StreamType.general, stream_name: str = str(),
  92                            protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None,
  93                            max_message_size_len: Optional[int] = None,
  94                            **kwds) -> Tuple['TcpStreamReader', 'TcpStreamWriter']:
  95        """A wrapper for create_connection() returning a (reader, writer) pair.
  96
  97        The reader returned is a TcpStreamReader instance; the writer is a
  98        TcpStreamWriter instance.
  99
 100        The arguments are all the usual arguments to create_connection()
 101        except protocol_factory; most common are positional host and port,
 102        with various optional keyword arguments following.
 103
 104        Additional optional keyword arguments are loop (to set the event loop
 105        instance to use) and limit (to set the buffer limit passed to the
 106        TcpStreamReader).
 107
 108        (If you want to customize the TcpStreamReader and/or
 109        TcpStreamReaderProtocol classes, just copy the code -- there's
 110        really nothing special here except some convenience.)
 111        """
 112        if StreamType.gate == stream_type:
 113            raise ValueError(f'Wrong stream_type value: client can not be a "gate".')
 114        
 115        if loop is None:
 116            loop = events.get_event_loop()
 117        else:
 118            warnings.warn("The loop argument is deprecated since Python 3.8, "
 119                        "and scheduled for removal in Python 3.10.",
 120                        DeprecationWarning, stacklevel=2)
 121        
 122        message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len)
 123        reader = TcpStreamReader(self, message_protocol_settings, limit=limit, loop=loop)
 124        protocol = TcpStreamReaderProtocol(self, message_protocol_settings, reader, loop=loop)
 125        transport, _ = await loop.create_connection(
 126            lambda: protocol, host, port, **kwds)
 127        writer = TcpStreamWriter(transport, protocol, reader, loop)
 128        return reader, writer
 129    
 130    def bind_existing_connection(self, reader: OriginalStreamReader, writer: OriginalStreamWriter, *, loop=None, limit=DEFAULT_LIMIT,
 131                            stream_type: StreamType = StreamType.general, stream_name: str = str(),
 132                            protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None,
 133                            max_message_size_len: Optional[int] = None,
 134                            **kwds) -> Tuple['TcpStreamReader', 'TcpStreamWriter']:
 135        reader = reinterpret_cast(TcpStreamReader, reader)
 136        message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len)
 137        reader._bind_to_stream_manager(self, message_protocol_settings, limit=limit, loop=loop)
 138        
 139        protocol = reinterpret_cast(TcpStreamReaderProtocol, writer._protocol)
 140        protocol._bind_to_stream_manager(message_protocol_settings, reader, loop=loop)
 141        
 142        transport = writer._transport
 143        if isinstance(transport, _SSLProtocolTransport):
 144            transport = cast(_SSLProtocolTransport, transport)
 145            ssl_protocol: SSLProtocol = transport._ssl_protocol
 146            ssl_protocol._set_app_protocol(protocol)
 147        elif isinstance(transport, _ProactorSocketTransport):
 148            transport = cast(_ProactorSocketTransport, transport)
 149            transport.set_protocol(protocol)
 150        elif isinstance(transport, _SelectorSocketTransport):
 151            transport = cast(_SelectorSocketTransport, transport)
 152            transport.set_protocol(protocol)
 153        else:
 154            raise RuntimeError(f'Unsupported transport type: {type(transport)}')
 155
 156        writer = reinterpret_cast(TcpStreamWriter, writer)
 157        writer._bind_to_stream_manager(self, transport, protocol, reader, loop)
 158
 159        return reader, writer
 160
 161    async def start_server(self, client_connected_cb, host=None, port=None, *,
 162                        loop=None, limit=DEFAULT_LIMIT,
 163                        stream_type: StreamType = StreamType.general, stream_name: str = str(),
 164                        gate_security_policy: GateSecurityPolicy = GateSecurityPolicy.disabled, policy_managed_stream_names: Optional[Set[str]] = None,
 165                        protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None,
 166                        max_message_size_len: Optional[int] = None,
 167                        **kwds):
 168        """Start a socket server, call back for each client connected.
 169
 170        The first parameter, `client_connected_cb`, takes two parameters:
 171        client_reader, client_writer.  client_reader is a TcpStreamReader
 172        object, while client_writer is a TcpStreamWriter object.  This
 173        parameter can either be a plain callback function or a coroutine;
 174        if it is a coroutine, it will be automatically converted into a
 175        Task.
 176
 177        The rest of the arguments are all the usual arguments to
 178        loop.create_server() except protocol_factory; most common are
 179        positional host and port, with various optional keyword arguments
 180        following.  The return value is the same as loop.create_server().
 181
 182        Additional optional keyword arguments are loop (to set the event loop
 183        instance to use) and limit (to set the buffer limit passed to the
 184        TcpStreamReader).
 185
 186        The return value is the same as loop.create_server(), i.e. a
 187        Server object which can be used to stop the service.
 188        """
 189        if loop is None:
 190            loop = events.get_event_loop()
 191        else:
 192            warnings.warn("The loop argument is deprecated since Python 3.8, "
 193                        "and scheduled for removal in Python 3.10.",
 194                        DeprecationWarning, stacklevel=2)
 195
 196        def factory():
 197            message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len)
 198            reader =  TcpStreamReader(self, message_protocol_settings, limit=limit, loop=loop)
 199            protocol = TcpStreamReaderProtocol(self, message_protocol_settings, reader, client_connected_cb,
 200                                            loop=loop)
 201            return protocol
 202
 203        return await loop.create_server(factory, host, port, **kwds)
 204
 205    def bind_accepted_connection(self, 
 206                            reader: OriginalStreamReader, writer: OriginalStreamWriter, *, loop=None, limit=DEFAULT_LIMIT,
 207                            stream_type: StreamType = StreamType.general, stream_name: str = str(),
 208                            protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None,
 209                            max_message_size_len: Optional[int] = None,
 210                            **kwds) -> Tuple['TcpStreamReader', 'TcpStreamWriter']:
 211        reader = reinterpret_cast(TcpStreamReader, reader)
 212        message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len)
 213        reader._bind_to_stream_manager(self, message_protocol_settings, limit=limit, loop=loop)
 214        
 215        protocol = reinterpret_cast(TcpStreamReaderProtocol, writer._protocol)
 216        client_connected_cb = protocol._client_connected_cb
 217        protocol._bind_to_stream_manager(message_protocol_settings, reader, client_connected_cb, loop=loop)
 218        
 219        transport = writer._transport
 220        if isinstance(transport, _SSLProtocolTransport):
 221            transport = cast(_SSLProtocolTransport, transport)
 222            ssl_protocol: SSLProtocol = transport._ssl_protocol
 223            ssl_protocol._set_app_protocol(protocol)
 224        elif isinstance(transport, _ProactorSocketTransport):
 225            transport = cast(_ProactorSocketTransport, transport)
 226            transport.set_protocol(protocol)
 227        elif isinstance(transport, _SelectorSocketTransport):
 228            transport = cast(_SelectorSocketTransport, transport)
 229            transport.set_protocol(protocol)
 230        else:
 231            raise RuntimeError(f'Unsupported transport type: {type(transport)}')
 232
 233        writer = reinterpret_cast(TcpStreamWriter, writer)
 234        writer._bind_to_stream_manager(self, transport, protocol, reader, loop)
 235
 236        return reader, writer
 237
 238    async def try_establish_message_protocol_server_side(self, reader: 'TcpStreamReader', writer: 'TcpStreamWriter') -> bool:
 239        message_size_len_encoded = await reader.readonly_exactly(1)
 240        message_size_len = int.from_bytes(message_size_len_encoded, 'little')
 241        can_be_established: bool = False
 242        if message_size_len <= reader._message_protocol_settings.max_message_size_len:
 243            message_size_encoded = (await reader.readonly_exactly(1 + message_size_len))[1:]
 244            message_size = int.from_bytes(message_size_encoded, 'little')
 245            message = (await reader.readonly_exactly(1 + message_size_len + message_size))[1 + message_size_len:]
 246            if message == reader._message_protocol_settings.protocol_greeting:
 247                can_be_established = True
 248        
 249        if can_be_established:
 250            reader._message_protocol_settings.message_size_len = message_size_len
 251            writer._protocol._message_protocol_settings.message_size_len = message_size_len
 252            await reader.readexactly(1)
 253            await reader.read_message()
 254            message = reader._message_protocol_settings.message_size_len.to_bytes(1, 'little') + len(reader._message_protocol_settings.protocol_greeting).to_bytes(reader._message_protocol_settings.message_size_len, 'little') + reader._message_protocol_settings.protocol_greeting
 255            writer.write(message)
 256            await writer.full_drain()
 257            return True
 258        else:
 259            return False
 260    
 261    async def try_establish_message_protocol_client_side(self, reader: 'TcpStreamReader', writer: 'TcpStreamWriter') -> bool:
 262        message = reader._message_protocol_settings.message_size_len.to_bytes(1, 'little') + len(reader._message_protocol_settings.protocol_greeting).to_bytes(reader._message_protocol_settings.message_size_len, 'little') + reader._message_protocol_settings.protocol_greeting
 263        writer.write(message)
 264        await writer.full_drain()
 265        message_size_len_encoded = await reader.readonly_exactly(1)
 266        message_size_len = int.from_bytes(message_size_len_encoded, 'little')
 267        can_be_established: bool = False
 268        if message_size_len <= reader._message_protocol_settings.max_message_size_len:
 269            message_size_encoded = (await reader.readonly_exactly(1 + message_size_len))[1:]
 270            message_size = int.from_bytes(message_size_encoded, 'little')
 271            message = (await reader.readonly_exactly(1 + message_size_len + message_size))[1 + message_size_len:]
 272            if message == reader._message_protocol_settings.protocol_greeting:
 273                can_be_established = True
 274        
 275        if can_be_established:
 276            reader._message_protocol_settings.message_size_len = message_size_len
 277            writer._protocol._message_protocol_settings.message_size_len = message_size_len
 278            await reader.readexactly(1)
 279            await reader.read_message()
 280            return True
 281        else:
 282            return False
 283
 284
 285# def classes_with_amax_size() -> Tuple[Type]:
 286#     types = list()
 287#     try:
 288#         pass
 289#     except AttributeError:
 290#         pass
 291
 292
 293# class StreamReaderCopy(OriginalStreamReader):
 294#     # def __init__(self, limit: int, loop: events.AbstractEventLoop) -> None:
 295#     #     self.stream_manager = None
 296#     #     super().__init__(limit, loop)
 297    
 298#     def __init__(self, manager: TcpStreamManager, original_stream_reader: OriginalStreamReader) -> None:
 299#         self._stream_manager = manager
 300#         self.recv_buff_size_computer = RecvBuffSizeComputer()
 301#         cpu_info_inst = cpu_info()
 302#         # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size
 303#         # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l2_cache_size_per_virtual_core
 304#         # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size_per_virtual_core
 305#         # self.recv_buff_size_computer.max_recv_buff_size = 3145728
 306#         self.recv_buff_size_computer.max_recv_buff_size = 10 * 1024**2
 307#         # self.recv_buff_size_computer.max_recv_buff_size = 1024
 308#         print(f'max_recv_buff_size: {self.recv_buff_size_computer.max_recv_buff_size}')
 309#         original_dict: dict = copy(original_stream_reader.__dict__)
 310#         original_dict.pop('feed_data', None)
 311#         original_dict.pop('_stream_manager', None)
 312#         self.__dict__.update(original_dict)
 313
 314#     def feed_data(self, data):
 315#         assert not self._eof, 'feed_data after feed_eof'
 316
 317#         if not data:
 318#             return
 319
 320#         data_len = len(data)
 321#         self.recv_buff_size_computer.calc_new_recv_buff_size(data_len)
 322#         self._buffer.extend(data)
 323#         self._wakeup_waiter()
 324
 325#         if (self._transport is not None and
 326#                 not self._paused and
 327#                 len(self._buffer) > 2 * self._limit):
 328#             try:
 329#                 self._transport.pause_reading()
 330#             except NotImplementedError:
 331#                 # The transport can't be paused.
 332#                 # We'll just have to buffer all data.
 333#                 # Forget the transport so we don't keep trying.
 334#                 self._transport = None
 335#             else:
 336#                 self._paused = True
 337
 338#     def _maybe_resume_transport(self):
 339#         if isinstance(self._transport, (
 340#             proactor_events._ProactorDatagramTransport,
 341#             selector_events._SelectorTransport,
 342#             unix_events._UnixReadPipeTransport
 343#             )):
 344#             # if hasattr(self._transport, 'max_size'):
 345#             try:
 346#                 self._transport.max_size = self.recv_buff_size_computer.recv_buff_size
 347#                 # print(f'max_size: {self._transport.max_size}')
 348#             except AttributeError:
 349#                 pass
 350#         else:
 351#             print(f'Unsupported transport: {type(self._transport)}')
 352        
 353#         if self._paused and len(self._buffer) <= self._limit:
 354#             self._paused = False
 355#             self._transport.resume_reading()
 356    
 357#     async def read_with_counter(self):
 358#         if self._exception is not None:
 359#             raise self._exception
 360
 361#         # This used to just loop creating a new waiter hoping to
 362#         # collect everything in self._buffer, but that would
 363#         # deadlock if the subprocess sends more than self.limit
 364#         # bytes.  So just call self.read(self._limit) until EOF.
 365#         blocks = []
 366#         counter = 0
 367#         while True:
 368#             block = await self.read(self._limit)
 369#             counter += 1
 370#             if not block:
 371#                 break
 372#             blocks.append(block)
 373#         return b''.join(blocks), counter
 374
 375
 376class TcpStreamReader(OriginalStreamReader, StreamReaderAbstract):
 377    def __init__(self, manager: TcpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None:
 378        self._bind_to_stream_manager(manager, message_protocol_settings, *args, **kwargs)
 379
 380    def _bind_to_stream_manager(self, manager: TcpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None:
 381        super().__init__(*args, **kwargs)
 382        self._stream_manager = manager
 383        self._message_protocol_settings: MessageProtocolSettings = message_protocol_settings
 384        self._smart_buffer: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.input_from_client_container_type(
 385            external_data_length=self._stream_manager.io_memory_management.global_in__data_full_size)
 386        self.recv_buff_size_computer = RecvBuffSizeComputer()
 387        cpu_info_inst = cpu_info()
 388        # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size
 389        # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l2_cache_size_per_virtual_core
 390        # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size_per_virtual_core
 391        # self.recv_buff_size_computer.max_recv_buff_size = 3145728
 392        self.recv_buff_size_computer.max_recv_buff_size = 10 * 1024**2
 393        # self.recv_buff_size_computer.max_recv_buff_size = 1024
 394        # print(f'max_recv_buff_size: {self.recv_buff_size_computer.max_recv_buff_size}')
 395        self.limit_by_limit: bool = True
 396        self.limit_by_global_in__data_size_limit: bool = True
 397    
 398    async def read_max(self):
 399        return await self.read(self._limit)
 400    
 401    async def read_nearly_max(self):
 402        return await self.read_nearly(self._limit)
 403    
 404    async def read_with_counter(self):
 405        if self._exception is not None:
 406            raise self._exception
 407
 408        # This used to just loop creating a new waiter hoping to
 409        # collect everything in self._buffer, but that would
 410        # deadlock if the subprocess sends more than self.limit
 411        # bytes.  So just call self.read(self._limit) until EOF.
 412        blocks = []
 413        counter = 0
 414        while True:
 415            block = await self.read_max()
 416            counter += 1
 417            if not block:
 418                break
 419            blocks.append(block)
 420        return b''.join(blocks), counter
 421
 422    def __repr__(self):
 423        info = ['TcpStreamReader']
 424        if self._smart_buffer.size():
 425            info.append(f'{self._smart_buffer.size()} bytes')
 426        if self._eof:
 427            info.append('eof')
 428        if self._limit != DEFAULT_LIMIT:
 429            info.append(f'limit={self._limit}')
 430        if self._waiter:
 431            info.append(f'waiter={self._waiter!r}')
 432        if self._exception:
 433            info.append(f'exception={self._exception!r}')
 434        if self._transport:
 435            info.append(f'transport={self._transport!r}')
 436        if self._paused:
 437            info.append('paused')
 438        return '<{}>'.format(' '.join(info))
 439
 440    def _maybe_resume_transport(self):
 441        if isinstance(self._transport, (
 442            proactor_events._ProactorDatagramTransport,
 443            selector_events._SelectorTransport,
 444            unix_events._UnixReadPipeTransport
 445            )):
 446            # if hasattr(self._transport, 'max_size'):
 447            try:
 448                self._transport.max_size = self.recv_buff_size_computer.recv_buff_size
 449                # print(f'max_size: {self._transport.max_size}')
 450            except AttributeError:
 451                pass
 452        else:
 453            print(f'Unsupported transport: {type(self._transport)}')
 454        
 455        if self._paused \
 456            and (
 457                ((not self.limit_by_limit) and (not self.limit_by_global_in__data_size_limit)) \
 458                or (self.limit_by_limit and (not self._limit)) \
 459                or (self.limit_by_limit and (self._smart_buffer.size() <= self._limit)) \
 460                or (self.limit_by_global_in__data_size_limit and (not self._stream_manager.io_memory_management.global_in__data_size_limit)) \
 461                or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value <= self._stream_manager.io_memory_management.global_in__data_size_limit.value))
 462            ):
 463            self._paused = False
 464            self._transport.resume_reading()
 465
 466    def at_eof(self):
 467        """Return True if the buffer is empty and 'feed_eof' was called."""
 468        return self._eof and not self._smart_buffer.size()
 469
 470    def feed_data(self, data):
 471        assert not self._eof, 'feed_data after feed_eof'
 472
 473        if not data:
 474            return
 475
 476        data_len = len(data)
 477        self.recv_buff_size_computer.calc_new_recv_buff_size(data_len)
 478        self._smart_buffer.add_piece_of_data(data)
 479        self._wakeup_waiter()
 480
 481        if (self._transport is not None and
 482                not self._paused 
 483                and (
 484                    (self.limit_by_limit and (self._smart_buffer.size() > 2 * self._limit)
 485                    or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value > self._stream_manager.io_memory_management.global_in__data_size_limit.value)))
 486                )):
 487            try:
 488                self._transport.pause_reading()
 489            except NotImplementedError:
 490                # The transport can't be paused.
 491                # We'll just have to buffer all data.
 492                # Forget the transport so we don't keep trying.
 493                self._transport = None
 494            else:
 495                self._paused = True
 496
 497    async def readline(self):
 498        """Read chunk of data from the stream until newline (b'\n') is found.
 499
 500        On success, return chunk that ends with newline. If only partial
 501        line can be read due to EOF, return incomplete line without
 502        terminating newline. When EOF was reached while no bytes read, empty
 503        bytes object is returned.
 504
 505        If limit is reached, ValueError will be raised. In that case, if
 506        newline was found, complete line including newline will be removed
 507        from internal buffer. Else, internal buffer will be cleared. Limit is
 508        compared against part of the line without newline.
 509
 510        If stream was paused, this function will automatically resume it if
 511        needed.
 512        """
 513        sep = b'\n'
 514        seplen = len(sep)
 515        try:
 516            line = await self.readuntil(sep)
 517        except IncompleteReadError as e:
 518            return e.partial
 519        except LimitOverrunError as e:
 520            if self._smart_buffer.startswith(sep, e.consumed):
 521                self._smart_buffer.get_data(e.consumed + seplen)
 522            else:
 523                self._smart_buffer.clear()
 524            
 525            self._maybe_resume_transport()
 526            raise ValueError(e.args[0])
 527        return line
 528
 529    async def readuntil(self, separator=b'\n'):
 530        """Read data from the stream until ``separator`` is found.
 531
 532        On success, the data and separator will be removed from the
 533        internal buffer (consumed). Returned data will include the
 534        separator at the end.
 535
 536        Configured stream limit is used to check result. Limit sets the
 537        maximal length of data that can be returned, not counting the
 538        separator.
 539
 540        If an EOF occurs and the complete separator is still not found,
 541        an IncompleteReadError exception will be raised, and the internal
 542        buffer will be reset.  The IncompleteReadError.partial attribute
 543        may contain the separator partially.
 544
 545        If the data cannot be read because of over limit, a
 546        LimitOverrunError exception  will be raised, and the data
 547        will be left in the internal buffer, so it can be read again.
 548        """
 549        seplen = len(separator)
 550        if seplen == 0:
 551            raise ValueError('Separator should be at least one-byte string')
 552
 553        if self._exception is not None:
 554            raise self._exception
 555
 556        # Consume whole buffer except last bytes, which length is
 557        # one less than seplen. Let's check corner cases with
 558        # separator='SEPARATOR':
 559        # * we have received almost complete separator (without last
 560        #   byte). i.e buffer='some textSEPARATO'. In this case we
 561        #   can safely consume len(separator) - 1 bytes.
 562        # * last byte of buffer is first byte of separator, i.e.
 563        #   buffer='abcdefghijklmnopqrS'. We may safely consume
 564        #   everything except that last byte, but this require to
 565        #   analyze bytes of buffer that match partial separator.
 566        #   This is slow and/or require FSM. For this case our
 567        #   implementation is not optimal, since require rescanning
 568        #   of data that is known to not belong to separator. In
 569        #   real world, separator will not be so long to notice
 570        #   performance problems. Even when reading MIME-encoded
 571        #   messages :)
 572
 573        # `offset` is the number of bytes from the beginning of the buffer
 574        # where there is no occurrence of `separator`.
 575        offset = 0
 576
 577        # Loop until we find `separator` in the buffer, exceed the buffer size,
 578        # or an EOF has happened.
 579        while True:
 580            buflen = self._smart_buffer.size()
 581
 582            # Check if we now have enough data in the buffer for `separator` to
 583            # fit.
 584            if buflen - offset >= seplen:
 585                isep = self._smart_buffer.find(separator, offset)
 586
 587                if isep != -1:
 588                    # `separator` is in the buffer. `isep` will be used later
 589                    # to retrieve the data.
 590                    break
 591
 592                # see upper comment for explanation.
 593                offset = buflen + 1 - seplen
 594                if offset > self._limit:
 595                    raise LimitOverrunError(
 596                        'Separator is not found, and chunk exceed the limit',
 597                        offset)
 598
 599            # Complete message (with full separator) may be present in buffer
 600            # even when EOF flag is set. This may happen when the last chunk
 601            # adds data which makes separator be found. That's why we check for
 602            # EOF *ater* inspecting the buffer.
 603            if self._eof:
 604                chunk = self._smart_buffer.get_data(self._smart_buffer.size())
 605                raise IncompleteReadError(chunk, None)
 606
 607            # _wait_for_data() will resume reading if stream was paused.
 608            await self._wait_for_data('readuntil')
 609
 610        if isep > self._limit:
 611            raise LimitOverrunError(
 612                'Separator is found, but chunk is longer than limit', isep)
 613
 614        chunk = self._smart_buffer.get_data(isep + seplen)
 615        self._maybe_resume_transport()
 616        return bytes(chunk)
 617
 618    async def read(self, n=-1):
 619        """Read up to `n` bytes from the stream.
 620
 621        If n is not provided, or set to -1, read until EOF and return all read
 622        bytes. If the EOF was received and the internal buffer is empty, return
 623        an empty bytes object.
 624
 625        If n is zero, return empty bytes object immediately.
 626
 627        If n is positive, this function try to read `n` bytes, and may return
 628        less or equal bytes than requested, but at least one byte. If EOF was
 629        received before any byte is read, this function returns empty byte
 630        object.
 631
 632        Returned value is not limited with limit, configured at stream
 633        creation.
 634
 635        If stream was paused, this function will automatically resume it if
 636        needed.
 637        """
 638
 639        if self._exception is not None:
 640            raise self._exception
 641
 642        if n == 0:
 643            return b''
 644
 645        if n < 0:
 646            # This used to just loop creating a new waiter hoping to
 647            # collect everything in self._buffer, but that would
 648            # deadlock if the subprocess sends more than self.limit
 649            # bytes.  So just call self.read(self._limit) until EOF.
 650            blocks = []
 651            while True:
 652                block = await self.read_nearly(max(self._limit, self._smart_buffer.size()))
 653                if not block:
 654                    break
 655                blocks.append(block)
 656            return b''.join(blocks)
 657
 658        if not self._smart_buffer.size() and not self._eof:
 659            await self._wait_for_data('read')
 660
 661        # This will work right even if buffer is less than n bytes
 662        data = self._smart_buffer.get_data(min(n, self._smart_buffer.size()))
 663
 664        self._maybe_resume_transport()
 665        return data
 666
 667    async def read_nearly(self, n=-1):
 668        """Read up to `n` bytes from the stream.
 669
 670        If n is not provided, or set to -1, read until EOF and return all read
 671        bytes. If the EOF was received and the internal buffer is empty, return
 672        an empty bytes object.
 673
 674        If n is zero, return empty bytes object immediately.
 675
 676        If n is positive, this function try to read `n` bytes, and may return
 677        less or equal bytes than requested, but at least one byte. If EOF was
 678        received before any byte is read, this function returns empty byte
 679        object.
 680
 681        Returned value is not limited with limit, configured at stream
 682        creation.
 683
 684        If stream was paused, this function will automatically resume it if
 685        needed.
 686        """
 687
 688        if self._exception is not None:
 689            raise self._exception
 690
 691        if n == 0:
 692            return b''
 693
 694        if n < 0:
 695            # This used to just loop creating a new waiter hoping to
 696            # collect everything in self._buffer, but that would
 697            # deadlock if the subprocess sends more than self.limit
 698            # bytes.  So just call self.read(self._limit) until EOF.
 699            blocks = []
 700            while True:
 701                block = await self.read_nearly(max(self._limit, self._smart_buffer.size()))
 702                if not block:
 703                    break
 704                blocks.append(block)
 705            return b''.join(blocks)
 706
 707        if not self._smart_buffer.size() and not self._eof:
 708            await self._wait_for_data('read')
 709
 710        # This will work right even if buffer is less than n bytes
 711        data = self._smart_buffer.get_data_nearly(n)
 712
 713        self._maybe_resume_transport()
 714        return data
 715
 716    async def readexactly(self, n):
 717        """Read exactly `n` bytes.
 718
 719        Raise an IncompleteReadError if EOF is reached before `n` bytes can be
 720        read. The IncompleteReadError.partial attribute of the exception will
 721        contain the partial read bytes.
 722
 723        if n is zero, return empty bytes object.
 724
 725        Returned value is not limited with limit, configured at stream
 726        creation.
 727
 728        If stream was paused, this function will automatically resume it if
 729        needed.
 730        """
 731        if n < 0:
 732            raise ValueError('readexactly size can not be less than zero')
 733
 734        if self._exception is not None:
 735            raise self._exception
 736
 737        if n == 0:
 738            return b''
 739
 740        while self._smart_buffer.size() < n:
 741            if self._eof:
 742                incomplete = self._smart_buffer.get_data(self._smart_buffer.size())
 743                raise IncompleteReadError(incomplete, n)
 744
 745            await self._wait_for_data('readexactly')
 746
 747        if self._smart_buffer.size() == n:
 748            data = self._smart_buffer.get_data(self._smart_buffer.size())
 749        else:
 750            data = self._smart_buffer.get_data(n)
 751
 752        self._maybe_resume_transport()
 753        return data
 754    
 755    async def readonly_exactly(self, n):
 756        if n < 0:
 757            raise ValueError('readexactly size can not be less than zero')
 758
 759        if self._exception is not None:
 760            raise self._exception
 761
 762        if n == 0:
 763            return b''
 764
 765        while self._smart_buffer.size() < n:
 766            if self._eof:
 767                incomplete = self._smart_buffer.read_data(self._smart_buffer.size())
 768                raise IncompleteReadError(incomplete, n)
 769
 770            await self._wait_for_data('readexactly')
 771
 772        if self._smart_buffer.size() == n:
 773            data = self._smart_buffer.read_data(self._smart_buffer.size())
 774        else:
 775            data = self._smart_buffer.read_data(n)
 776
 777        self._maybe_resume_transport()
 778        return data
 779    
 780    async def read_message(self):
 781        message_len_encoded = await self.readexactly(self._message_protocol_settings.message_size_len)
 782        message_len = int.from_bytes(message_len_encoded, 'little')
 783        return await self.readexactly(message_len)
 784    
 785    def message_awailable(self) -> bool:
 786        message_size_len = self._message_protocol_settings.message_size_len
 787        if self._smart_buffer.size() < message_size_len:
 788            return False
 789
 790        message_len_encoded = self._smart_buffer.get_data(message_size_len)
 791        message_len = int.from_bytes(message_len_encoded, 'little')
 792        if self._smart_buffer.size() < (message_size_len + message_len):
 793            return False
 794        
 795        return True
 796    
 797    def transport_pause_reading(self):
 798        try:
 799            self._transport.pause_reading()
 800        except NotImplementedError:
 801            # The transport can't be paused.
 802            # We'll just have to buffer all data.
 803            # Forget the transport so we don't keep trying.
 804            pass
 805        else:
 806            self._paused = True
 807    
 808    def transport_resume_reading(self):
 809        self._paused = False
 810        self._transport.resume_reading()
 811
 812
 813# class StreamReaderProtocolCopy(OriginalStreamReaderProtocol):
 814#     def __init__(self, manager: TcpStreamManager, original_stream_reader_protocol: OriginalStreamReaderProtocol) -> None:
 815#         self._stream_manager = manager
 816#         original_dict: dict = copy(original_stream_reader_protocol.__dict__)
 817#         original_dict.pop('_stream_manager', None)
 818#         original_dict.pop('_client_connected_cb', None)
 819#         self.__dict__.update(original_dict)
 820#         self._original__client_connected_cb = None
 821#         self._client_connected_cb = original_stream_reader_protocol._client_connected_cb
 822    
 823#     async def _wrapper__client_connected_cb(self, reader: OriginalStreamReader, writer: OriginalStreamWriter):
 824#         self._stream_writer = StreamWriterCopy(self._stream_manager, writer)
 825#         if self._original__client_connected_cb is not None:
 826#             await self._original__client_connected_cb(reader, self._stream_writer)
 827    
 828#     @property
 829#     def _client_connected_cb(self):
 830#         if self._original__client_connected_cb is None:
 831#             return None
 832#         else:
 833#             return self._wrapper__client_connected_cb
 834    
 835#     @_client_connected_cb.setter
 836#     def _client_connected_cb(self, value):
 837#         self._original__client_connected_cb = value
 838
 839
 840class TcpStreamReaderProtocol(OriginalStreamReaderProtocol, StreamReaderProtocolAbstract):
 841    def __init__(self, manager: TcpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None:
 842        self._bind_to_stream_manager(manager, message_protocol_settings, *args, **kwargs)
 843
 844    def _bind_to_stream_manager(self, manager: TcpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None:
 845        super().__init__(*args, **kwargs)
 846
 847        self._stream_manager = manager
 848        self._message_protocol_settings: MessageProtocolSettings = message_protocol_settings
 849
 850    def connection_made(self, transport):
 851        if self._reject_connection:
 852            context = {
 853                'message': ('An open stream was garbage collected prior to '
 854                            'establishing network connection; '
 855                            'call "stream.close()" explicitly.')
 856            }
 857            if self._source_traceback:
 858                context['source_traceback'] = self._source_traceback
 859            self._loop.call_exception_handler(context)
 860            transport.abort()
 861            return
 862        
 863        self._transport = transport
 864        reader = self._stream_reader
 865        if reader is not None:
 866            reader.set_transport(transport)
 867
 868        self._over_ssl = transport.get_extra_info('sslcontext') is not None
 869        if self._client_connected_cb is not None:
 870            self._stream_writer = TcpStreamWriter(transport, self,
 871                                               reader,
 872                                               self._loop)
 873            res = self._client_connected_cb(reader,
 874                                            self._stream_writer)
 875            if coroutines.iscoroutine(res):
 876                self._loop.create_task(res)
 877            
 878            self._strong_reader = None
 879
 880
 881# class StreamWriterCopy(OriginalStreamWriter):
 882#     def __init__(self, manager: TcpStreamManager, original_stream_writer: OriginalStreamWriter) -> None:
 883#         self._stream_manager = manager
 884#         self._output_to_client: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.output_to_client_container_type(
 885#             external_data_length=self._stream_manager.io_memory_management.global_out__data_full_size)
 886#         self.socket_write_fixed_buffer_size: ValueExistence = self._stream_manager.io_memory_management.socket_write_fixed_buffer_size
 887#         self._autonomous_writer_future: Task = None
 888#         self._autonomous_writer_future_stop_requessted: bool = False
 889#         self._autonomous_writer_drain_enough_futures: List[Future] = list()
 890#         original_dict: dict = copy(original_stream_writer.__dict__)
 891#         original_dict.pop('_stream_manager', None)
 892#         original_dict.pop('_output_to_client', None)
 893#         original_dict.pop('_autonomous_writer_future_stop_requessted', None)
 894#         self.__dict__.update(original_dict)
 895    
 896#     def optimized_write(self, data):
 897#         # self._output_to_client.add_piece_of_data(data)
 898#         self.write(data)
 899
 900#     def owrite(self, data):
 901#         return self.optimized_write(data)
 902
 903#     async def partial_drain(self):
 904#         await self.drain()
 905#         # another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
 906#         # while another_piece_of_data:
 907#         #     self.write(another_piece_of_data)
 908#         #     await self.drain()
 909#         #     another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
 910
 911#     async def pdrain(self):
 912#         return await self.partial_drain()
 913
 914#     async def full_drain(self):
 915#         await self.pdrain()
 916#         rest_of_the_data_size = self._output_to_client.size()
 917#         if rest_of_the_data_size:
 918#             another_piece_of_data = self._output_to_client.get_data(rest_of_the_data_size)
 919#             self.write(another_piece_of_data)
 920#             await self.drain()
 921
 922#     async def fdrain(self):
 923#         return await self.full_drain()
 924    
 925#     def _release_autonomous_writer_waiters(self):
 926#         current_size = self._output_to_client.size()
 927#         autonomous_writer_drain_enough_futures_buff = self._autonomous_writer_drain_enough_futures
 928#         self._autonomous_writer_drain_enough_futures = type(autonomous_writer_drain_enough_futures_buff)()
 929#         for item in autonomous_writer_drain_enough_futures_buff:
 930#             lower_water_size, future = item
 931#             if current_size < lower_water_size:
 932#                 if (not future.done()) and (not future.cancelled()):
 933#                     future.set_result(None)
 934
 935#             if (not future.done()) and (not future.cancelled()):
 936#                 self._autonomous_writer_drain_enough_futures.append(item)
 937    
 938#     async def _autonomous_writer_impl(self):
 939#         ty = TimedYield(0)
 940#         while not self._autonomous_writer_future_stop_requessted:
 941#             another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
 942#             self._release_autonomous_writer_waiters()
 943#             while another_piece_of_data:
 944#                 self.write(another_piece_of_data)
 945#                 await self.drain()
 946#                 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
 947#                 self._release_autonomous_writer_waiters()
 948            
 949#             await ty()
 950    
 951#     def start_autonomous_writer(self):
 952#         self._autonomous_writer_future = create_task(self._autonomous_writer_impl)
 953    
 954#     def start_aw(self):
 955#         return self.start_autonomous_writer()
 956    
 957#     async def stop_autonomous_writer(self, timeout: Optional[Union[int, float]] = 0):
 958#         """_summary_
 959
 960#         Args:
 961#             timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout
 962
 963#         Returns:
 964#             _type_: _description_
 965#         """
 966#         result = None
 967#         if timeout is None:
 968#             timeout = self._stream_manager.autonomous_writer_stop_default_timeout
 969        
 970#         if self._autonomous_writer_future and (not self._autonomous_writer_future_stop_requessted):
 971#             self._autonomous_writer_future_stop_requessted = True
 972#             if timeout:
 973#                 result = await asyncio.wait_for(self._autonomous_writer_future, timeout)
 974#             else:
 975#                 result = await self._autonomous_writer_future
 976            
 977#             self._autonomous_writer_future = None
 978#             self._autonomous_writer_future_stop_requessted = False
 979        
 980#         return result
 981    
 982#     def stop_aw(self, timeout: Optional[Union[int, float]] = 0):
 983#         """_summary_
 984
 985#         Args:
 986#             timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout
 987
 988#         Returns:
 989#             _type_: _description_
 990#         """
 991#         return self.stop_autonomous_writer(timeout)
 992    
 993#     async def autonomous_writer_drain_enough(self, lower_water_size: Optional[int] = None):
 994#         if lower_water_size is None:
 995#             lower_water_size = self.socket_write_fixed_buffer_size.value * 2
 996        
 997#         if lower_water_size <= self._output_to_client.size():
 998#             future: Future = self._loop.create_future()
 999#             self._autonomous_writer_drain_enough_futures.append((lower_water_size, future))
1000#             await future
1001
1002    
1003#     async def aw_drain_enough(self):
1004#         await self.autonomous_writer_drain_enough()
1005
1006
1007class TcpStreamWriter(OriginalStreamWriter, StreamWriterAbstract):
1008    def __init__(self, *args, **kwargs) -> None:
1009        self._bind_to_stream_manager(*args, **kwargs)
1010
1011    def _bind_to_stream_manager(self, *args, **kwargs) -> None:
1012        super().__init__(*args, **kwargs)
1013
1014        self._stream_manager: TcpStreamManager = self._protocol._stream_manager
1015        self._output_to_client: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.output_to_client_container_type(
1016            external_data_length=self._stream_manager.io_memory_management.global_out__data_full_size)
1017        self.socket_write_fixed_buffer_size: ValueExistence = self._stream_manager.io_memory_management.socket_write_fixed_buffer_size
1018        self._autonomous_writer_future: Task = None
1019        self._autonomous_writer_future_stop_requessted: bool = False
1020        self._autonomous_writer_drain_enough_futures: List[Future] = list()
1021
1022    def optimized_write(self, data):
1023        self._output_to_client.add_piece_of_data(data)
1024        # self.write(data)
1025
1026    def owrite(self, data):
1027        return self.optimized_write(data)
1028
1029    async def partial_drain(self):
1030        another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
1031        while another_piece_of_data:
1032            self.write(another_piece_of_data)
1033            await self.drain()
1034            another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
1035
1036    async def pdrain(self):
1037        return await self.partial_drain()
1038
1039    async def full_drain(self):
1040        await self.pdrain()
1041        rest_of_the_data_size = self._output_to_client.size()
1042        if rest_of_the_data_size:
1043            another_piece_of_data = self._output_to_client.get_data(rest_of_the_data_size)
1044            self.write(another_piece_of_data)
1045            await self.drain()
1046
1047    async def fdrain(self):
1048        return await self.full_drain()
1049    
1050    def _release_autonomous_writer_waiters(self):
1051        current_size = self._output_to_client.size()
1052        autonomous_writer_drain_enough_futures_buff = self._autonomous_writer_drain_enough_futures
1053        self._autonomous_writer_drain_enough_futures = type(autonomous_writer_drain_enough_futures_buff)()
1054        for item in autonomous_writer_drain_enough_futures_buff:
1055            lower_water_size, future = item
1056            if current_size < lower_water_size:
1057                if (not future.done()) and (not future.cancelled()):
1058                    future.set_result(None)
1059
1060            if (not future.done()) and (not future.cancelled()):
1061                self._autonomous_writer_drain_enough_futures.append(item)
1062    
1063    async def _autonomous_writer_impl(self):
1064        ty = TimedYield(0)
1065        while not self._autonomous_writer_future_stop_requessted:
1066            another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
1067            self._release_autonomous_writer_waiters()
1068            while another_piece_of_data:
1069                self.write(another_piece_of_data)
1070                await self.drain()
1071                another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
1072                self._release_autonomous_writer_waiters()
1073            
1074            await ty()
1075    
1076    def start_autonomous_writer(self):
1077        self._autonomous_writer_future = create_task(self._autonomous_writer_impl)
1078    
1079    def start_aw(self):
1080        return self.start_autonomous_writer()
1081    
1082    async def stop_autonomous_writer(self, timeout: Optional[Union[int, float]] = 0):
1083        """_summary_
1084
1085        Args:
1086            timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout
1087
1088        Returns:
1089            _type_: _description_
1090        """
1091        result = None
1092        if timeout is None:
1093            timeout = self._stream_manager.autonomous_writer_stop_default_timeout
1094        
1095        if self._autonomous_writer_future and (not self._autonomous_writer_future_stop_requessted):
1096            self._autonomous_writer_future_stop_requessted = True
1097            if timeout:
1098                result = await asyncio.wait_for(self._autonomous_writer_future, timeout)
1099            else:
1100                result = await self._autonomous_writer_future
1101            
1102            self._autonomous_writer_future = None
1103            self._autonomous_writer_future_stop_requessted = False
1104        
1105        return result
1106    
1107    async def stop_aw(self, timeout: Optional[Union[int, float]] = 0):
1108        """_summary_
1109
1110        Args:
1111            timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout
1112
1113        Returns:
1114            _type_: _description_
1115        """
1116        return await self.stop_autonomous_writer(timeout)
1117    
1118    async def autonomous_writer_drain_enough(self, lower_water_size: Optional[int] = None):
1119        if lower_water_size is None:
1120            lower_water_size = self.socket_write_fixed_buffer_size.value * 3
1121            # print(f'lower_water_size: {lower_water_size}')
1122            # lower_water_size = cpu_info().l3_cache_size
1123        
1124        if lower_water_size <= self._output_to_client.size():
1125            future: Future = self._loop.create_future()
1126            self._autonomous_writer_drain_enough_futures.append((lower_water_size, future))
1127            await future
1128    
1129    async def aw_drain_enough(self):
1130        await self.autonomous_writer_drain_enough()
1131    
1132    def optimized_write_message(self, data):
1133        self.optimized_write(len(data).to_bytes(self._protocol._message_protocol_settings.message_size_len, 'little') + data)
1134    
1135    def owrite_message(self, data):
1136        self.optimized_write_message(data)
1137    
1138    async def send_message(self, data):
1139        self.optimized_write_message(data)
1140        await self.fdrain()
1141    
1142    @asynccontextmanager
1143    async def autonomous_writer(self):
1144        self.start_autonomous_writer()
1145        try:
1146            yield
1147        finally:
1148            await self.stop_autonomous_writer()
1149    
1150    aw = autonomous_writer
1151
1152    def __enter__(self):
1153        pass
1154
1155    def __exit__(self, exc_type, exc, tb):
1156        self.close()
class StreamType(enum.Enum):
49class StreamType(Enum):
50    general = 0
51    message_based_anonymous = 1
52    message_based_names = 2
53    gate = 3

An enumeration.

general = <StreamType.general: 0>
message_based_anonymous = <StreamType.message_based_anonymous: 1>
message_based_names = <StreamType.message_based_names: 2>
gate = <StreamType.gate: 3>
Inherited Members
enum.Enum
name
value
class GateSecurityPolicy(enum.Enum):
56class GateSecurityPolicy(Enum):
57    # gate will allow only or ban selected stream names to conect to this gate
58    allowed = 0
59    disabled = 1

An enumeration.

disabled = <GateSecurityPolicy.disabled: 1>
Inherited Members
enum.Enum
name
value
class StreamManagerIOCoreMemoryManagement(cengal.io.core.memory_management.versions.v_0.memory_management.IOCoreMemoryManagement):
62class StreamManagerIOCoreMemoryManagement(IOCoreMemoryManagement):
63    def __init__(self):
64        super(StreamManagerIOCoreMemoryManagement, self).__init__()
65
66        self.socket_write_fixed_buffer_size = ValueExistence(True,
67                                                             cpu_info().l2_cache_size_per_virtual_core)
68
69    def link_to(self, parent):
70        super(StreamManagerIOCoreMemoryManagement, self).link_to(parent)
71        try:
72            self.socket_write_fixed_buffer_size = parent.socket_write_fixed_buffer_size
73        except AttributeError:
74            pass
socket_write_fixed_buffer_size
Inherited Members
cengal.io.core.memory_management.versions.v_0.memory_management.IOCoreMemoryManagement
global__data_size_limit
global__data_full_size
global__deletable_data_full_size
global_other__data_size_limit
global_other__data_full_size
global_other__deletable_data_full_size
global_in__data_size_limit
global_in__data_full_size
global_in__deletable_data_full_size
global_out__data_size_limit
global_out__data_full_size
global_out__deletable_data_full_size
class TcpStreamManager(cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamManagerAbstract):
 83class TcpStreamManager(StreamManagerAbstract):
 84    def __init__(self) -> None:
 85        self.io_memory_management: StreamManagerIOCoreMemoryManagement = StreamManagerIOCoreMemoryManagement()
 86        self.autonomous_writer_stop_default_timeout: Optional[Union[int, float]] = 10.0
 87        self.output_to_client_container_type = DynamicListOfPiecesDequeWithLengthControl
 88        self.input_from_client_container_type = DynamicListOfPiecesDequeWithLengthControl
 89
 90    async def open_connection(self, host=None, port=None, *,
 91                            loop=None, limit=DEFAULT_LIMIT,
 92                            stream_type: StreamType = StreamType.general, stream_name: str = str(),
 93                            protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None,
 94                            max_message_size_len: Optional[int] = None,
 95                            **kwds) -> Tuple['TcpStreamReader', 'TcpStreamWriter']:
 96        """A wrapper for create_connection() returning a (reader, writer) pair.
 97
 98        The reader returned is a TcpStreamReader instance; the writer is a
 99        TcpStreamWriter instance.
100
101        The arguments are all the usual arguments to create_connection()
102        except protocol_factory; most common are positional host and port,
103        with various optional keyword arguments following.
104
105        Additional optional keyword arguments are loop (to set the event loop
106        instance to use) and limit (to set the buffer limit passed to the
107        TcpStreamReader).
108
109        (If you want to customize the TcpStreamReader and/or
110        TcpStreamReaderProtocol classes, just copy the code -- there's
111        really nothing special here except some convenience.)
112        """
113        if StreamType.gate == stream_type:
114            raise ValueError(f'Wrong stream_type value: client can not be a "gate".')
115        
116        if loop is None:
117            loop = events.get_event_loop()
118        else:
119            warnings.warn("The loop argument is deprecated since Python 3.8, "
120                        "and scheduled for removal in Python 3.10.",
121                        DeprecationWarning, stacklevel=2)
122        
123        message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len)
124        reader = TcpStreamReader(self, message_protocol_settings, limit=limit, loop=loop)
125        protocol = TcpStreamReaderProtocol(self, message_protocol_settings, reader, loop=loop)
126        transport, _ = await loop.create_connection(
127            lambda: protocol, host, port, **kwds)
128        writer = TcpStreamWriter(transport, protocol, reader, loop)
129        return reader, writer
130    
131    def bind_existing_connection(self, reader: OriginalStreamReader, writer: OriginalStreamWriter, *, loop=None, limit=DEFAULT_LIMIT,
132                            stream_type: StreamType = StreamType.general, stream_name: str = str(),
133                            protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None,
134                            max_message_size_len: Optional[int] = None,
135                            **kwds) -> Tuple['TcpStreamReader', 'TcpStreamWriter']:
136        reader = reinterpret_cast(TcpStreamReader, reader)
137        message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len)
138        reader._bind_to_stream_manager(self, message_protocol_settings, limit=limit, loop=loop)
139        
140        protocol = reinterpret_cast(TcpStreamReaderProtocol, writer._protocol)
141        protocol._bind_to_stream_manager(message_protocol_settings, reader, loop=loop)
142        
143        transport = writer._transport
144        if isinstance(transport, _SSLProtocolTransport):
145            transport = cast(_SSLProtocolTransport, transport)
146            ssl_protocol: SSLProtocol = transport._ssl_protocol
147            ssl_protocol._set_app_protocol(protocol)
148        elif isinstance(transport, _ProactorSocketTransport):
149            transport = cast(_ProactorSocketTransport, transport)
150            transport.set_protocol(protocol)
151        elif isinstance(transport, _SelectorSocketTransport):
152            transport = cast(_SelectorSocketTransport, transport)
153            transport.set_protocol(protocol)
154        else:
155            raise RuntimeError(f'Unsupported transport type: {type(transport)}')
156
157        writer = reinterpret_cast(TcpStreamWriter, writer)
158        writer._bind_to_stream_manager(self, transport, protocol, reader, loop)
159
160        return reader, writer
161
162    async def start_server(self, client_connected_cb, host=None, port=None, *,
163                        loop=None, limit=DEFAULT_LIMIT,
164                        stream_type: StreamType = StreamType.general, stream_name: str = str(),
165                        gate_security_policy: GateSecurityPolicy = GateSecurityPolicy.disabled, policy_managed_stream_names: Optional[Set[str]] = None,
166                        protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None,
167                        max_message_size_len: Optional[int] = None,
168                        **kwds):
169        """Start a socket server, call back for each client connected.
170
171        The first parameter, `client_connected_cb`, takes two parameters:
172        client_reader, client_writer.  client_reader is a TcpStreamReader
173        object, while client_writer is a TcpStreamWriter object.  This
174        parameter can either be a plain callback function or a coroutine;
175        if it is a coroutine, it will be automatically converted into a
176        Task.
177
178        The rest of the arguments are all the usual arguments to
179        loop.create_server() except protocol_factory; most common are
180        positional host and port, with various optional keyword arguments
181        following.  The return value is the same as loop.create_server().
182
183        Additional optional keyword arguments are loop (to set the event loop
184        instance to use) and limit (to set the buffer limit passed to the
185        TcpStreamReader).
186
187        The return value is the same as loop.create_server(), i.e. a
188        Server object which can be used to stop the service.
189        """
190        if loop is None:
191            loop = events.get_event_loop()
192        else:
193            warnings.warn("The loop argument is deprecated since Python 3.8, "
194                        "and scheduled for removal in Python 3.10.",
195                        DeprecationWarning, stacklevel=2)
196
197        def factory():
198            message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len)
199            reader =  TcpStreamReader(self, message_protocol_settings, limit=limit, loop=loop)
200            protocol = TcpStreamReaderProtocol(self, message_protocol_settings, reader, client_connected_cb,
201                                            loop=loop)
202            return protocol
203
204        return await loop.create_server(factory, host, port, **kwds)
205
206    def bind_accepted_connection(self, 
207                            reader: OriginalStreamReader, writer: OriginalStreamWriter, *, loop=None, limit=DEFAULT_LIMIT,
208                            stream_type: StreamType = StreamType.general, stream_name: str = str(),
209                            protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None,
210                            max_message_size_len: Optional[int] = None,
211                            **kwds) -> Tuple['TcpStreamReader', 'TcpStreamWriter']:
212        reader = reinterpret_cast(TcpStreamReader, reader)
213        message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len)
214        reader._bind_to_stream_manager(self, message_protocol_settings, limit=limit, loop=loop)
215        
216        protocol = reinterpret_cast(TcpStreamReaderProtocol, writer._protocol)
217        client_connected_cb = protocol._client_connected_cb
218        protocol._bind_to_stream_manager(message_protocol_settings, reader, client_connected_cb, loop=loop)
219        
220        transport = writer._transport
221        if isinstance(transport, _SSLProtocolTransport):
222            transport = cast(_SSLProtocolTransport, transport)
223            ssl_protocol: SSLProtocol = transport._ssl_protocol
224            ssl_protocol._set_app_protocol(protocol)
225        elif isinstance(transport, _ProactorSocketTransport):
226            transport = cast(_ProactorSocketTransport, transport)
227            transport.set_protocol(protocol)
228        elif isinstance(transport, _SelectorSocketTransport):
229            transport = cast(_SelectorSocketTransport, transport)
230            transport.set_protocol(protocol)
231        else:
232            raise RuntimeError(f'Unsupported transport type: {type(transport)}')
233
234        writer = reinterpret_cast(TcpStreamWriter, writer)
235        writer._bind_to_stream_manager(self, transport, protocol, reader, loop)
236
237        return reader, writer
238
239    async def try_establish_message_protocol_server_side(self, reader: 'TcpStreamReader', writer: 'TcpStreamWriter') -> bool:
240        message_size_len_encoded = await reader.readonly_exactly(1)
241        message_size_len = int.from_bytes(message_size_len_encoded, 'little')
242        can_be_established: bool = False
243        if message_size_len <= reader._message_protocol_settings.max_message_size_len:
244            message_size_encoded = (await reader.readonly_exactly(1 + message_size_len))[1:]
245            message_size = int.from_bytes(message_size_encoded, 'little')
246            message = (await reader.readonly_exactly(1 + message_size_len + message_size))[1 + message_size_len:]
247            if message == reader._message_protocol_settings.protocol_greeting:
248                can_be_established = True
249        
250        if can_be_established:
251            reader._message_protocol_settings.message_size_len = message_size_len
252            writer._protocol._message_protocol_settings.message_size_len = message_size_len
253            await reader.readexactly(1)
254            await reader.read_message()
255            message = reader._message_protocol_settings.message_size_len.to_bytes(1, 'little') + len(reader._message_protocol_settings.protocol_greeting).to_bytes(reader._message_protocol_settings.message_size_len, 'little') + reader._message_protocol_settings.protocol_greeting
256            writer.write(message)
257            await writer.full_drain()
258            return True
259        else:
260            return False
261    
262    async def try_establish_message_protocol_client_side(self, reader: 'TcpStreamReader', writer: 'TcpStreamWriter') -> bool:
263        message = reader._message_protocol_settings.message_size_len.to_bytes(1, 'little') + len(reader._message_protocol_settings.protocol_greeting).to_bytes(reader._message_protocol_settings.message_size_len, 'little') + reader._message_protocol_settings.protocol_greeting
264        writer.write(message)
265        await writer.full_drain()
266        message_size_len_encoded = await reader.readonly_exactly(1)
267        message_size_len = int.from_bytes(message_size_len_encoded, 'little')
268        can_be_established: bool = False
269        if message_size_len <= reader._message_protocol_settings.max_message_size_len:
270            message_size_encoded = (await reader.readonly_exactly(1 + message_size_len))[1:]
271            message_size = int.from_bytes(message_size_encoded, 'little')
272            message = (await reader.readonly_exactly(1 + message_size_len + message_size))[1 + message_size_len:]
273            if message == reader._message_protocol_settings.protocol_greeting:
274                can_be_established = True
275        
276        if can_be_established:
277            reader._message_protocol_settings.message_size_len = message_size_len
278            writer._protocol._message_protocol_settings.message_size_len = message_size_len
279            await reader.readexactly(1)
280            await reader.read_message()
281            return True
282        else:
283            return False
io_memory_management: StreamManagerIOCoreMemoryManagement
autonomous_writer_stop_default_timeout: Union[int, float, NoneType]
output_to_client_container_type
input_from_client_container_type
async def open_connection( self, host=None, port=None, *, loop=None, limit=10485760, stream_type: StreamType = <StreamType.general: 0>, stream_name: str = '', protocol_greeting: Union[str, NoneType] = None, message_size_len: Union[int, NoneType] = None, max_message_size_len: Union[int, NoneType] = None, **kwds) -> tuple[TcpStreamReader, TcpStreamWriter]:
 90    async def open_connection(self, host=None, port=None, *,
 91                            loop=None, limit=DEFAULT_LIMIT,
 92                            stream_type: StreamType = StreamType.general, stream_name: str = str(),
 93                            protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None,
 94                            max_message_size_len: Optional[int] = None,
 95                            **kwds) -> Tuple['TcpStreamReader', 'TcpStreamWriter']:
 96        """A wrapper for create_connection() returning a (reader, writer) pair.
 97
 98        The reader returned is a TcpStreamReader instance; the writer is a
 99        TcpStreamWriter instance.
100
101        The arguments are all the usual arguments to create_connection()
102        except protocol_factory; most common are positional host and port,
103        with various optional keyword arguments following.
104
105        Additional optional keyword arguments are loop (to set the event loop
106        instance to use) and limit (to set the buffer limit passed to the
107        TcpStreamReader).
108
109        (If you want to customize the TcpStreamReader and/or
110        TcpStreamReaderProtocol classes, just copy the code -- there's
111        really nothing special here except some convenience.)
112        """
113        if StreamType.gate == stream_type:
114            raise ValueError(f'Wrong stream_type value: client can not be a "gate".')
115        
116        if loop is None:
117            loop = events.get_event_loop()
118        else:
119            warnings.warn("The loop argument is deprecated since Python 3.8, "
120                        "and scheduled for removal in Python 3.10.",
121                        DeprecationWarning, stacklevel=2)
122        
123        message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len)
124        reader = TcpStreamReader(self, message_protocol_settings, limit=limit, loop=loop)
125        protocol = TcpStreamReaderProtocol(self, message_protocol_settings, reader, loop=loop)
126        transport, _ = await loop.create_connection(
127            lambda: protocol, host, port, **kwds)
128        writer = TcpStreamWriter(transport, protocol, reader, loop)
129        return reader, writer

A wrapper for create_connection() returning a (reader, writer) pair.

The reader returned is a TcpStreamReader instance; the writer is a TcpStreamWriter instance.

The arguments are all the usual arguments to create_connection() except protocol_factory; most common are positional host and port, with various optional keyword arguments following.

Additional optional keyword arguments are loop (to set the event loop instance to use) and limit (to set the buffer limit passed to the TcpStreamReader).

(If you want to customize the TcpStreamReader and/or TcpStreamReaderProtocol classes, just copy the code -- there's really nothing special here except some convenience.)

def bind_existing_connection( self, reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter, *, loop=None, limit=10485760, stream_type: StreamType = <StreamType.general: 0>, stream_name: str = '', protocol_greeting: Union[str, NoneType] = None, message_size_len: Union[int, NoneType] = None, max_message_size_len: Union[int, NoneType] = None, **kwds) -> tuple[TcpStreamReader, TcpStreamWriter]:
131    def bind_existing_connection(self, reader: OriginalStreamReader, writer: OriginalStreamWriter, *, loop=None, limit=DEFAULT_LIMIT,
132                            stream_type: StreamType = StreamType.general, stream_name: str = str(),
133                            protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None,
134                            max_message_size_len: Optional[int] = None,
135                            **kwds) -> Tuple['TcpStreamReader', 'TcpStreamWriter']:
136        reader = reinterpret_cast(TcpStreamReader, reader)
137        message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len)
138        reader._bind_to_stream_manager(self, message_protocol_settings, limit=limit, loop=loop)
139        
140        protocol = reinterpret_cast(TcpStreamReaderProtocol, writer._protocol)
141        protocol._bind_to_stream_manager(message_protocol_settings, reader, loop=loop)
142        
143        transport = writer._transport
144        if isinstance(transport, _SSLProtocolTransport):
145            transport = cast(_SSLProtocolTransport, transport)
146            ssl_protocol: SSLProtocol = transport._ssl_protocol
147            ssl_protocol._set_app_protocol(protocol)
148        elif isinstance(transport, _ProactorSocketTransport):
149            transport = cast(_ProactorSocketTransport, transport)
150            transport.set_protocol(protocol)
151        elif isinstance(transport, _SelectorSocketTransport):
152            transport = cast(_SelectorSocketTransport, transport)
153            transport.set_protocol(protocol)
154        else:
155            raise RuntimeError(f'Unsupported transport type: {type(transport)}')
156
157        writer = reinterpret_cast(TcpStreamWriter, writer)
158        writer._bind_to_stream_manager(self, transport, protocol, reader, loop)
159
160        return reader, writer
async def start_server( self, client_connected_cb, host=None, port=None, *, loop=None, limit=10485760, stream_type: StreamType = <StreamType.general: 0>, stream_name: str = '', gate_security_policy: GateSecurityPolicy = <GateSecurityPolicy.disabled: 1>, policy_managed_stream_names: Union[Set[str], NoneType] = None, protocol_greeting: Union[str, NoneType] = None, message_size_len: Union[int, NoneType] = None, max_message_size_len: Union[int, NoneType] = None, **kwds):
162    async def start_server(self, client_connected_cb, host=None, port=None, *,
163                        loop=None, limit=DEFAULT_LIMIT,
164                        stream_type: StreamType = StreamType.general, stream_name: str = str(),
165                        gate_security_policy: GateSecurityPolicy = GateSecurityPolicy.disabled, policy_managed_stream_names: Optional[Set[str]] = None,
166                        protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None,
167                        max_message_size_len: Optional[int] = None,
168                        **kwds):
169        """Start a socket server, call back for each client connected.
170
171        The first parameter, `client_connected_cb`, takes two parameters:
172        client_reader, client_writer.  client_reader is a TcpStreamReader
173        object, while client_writer is a TcpStreamWriter object.  This
174        parameter can either be a plain callback function or a coroutine;
175        if it is a coroutine, it will be automatically converted into a
176        Task.
177
178        The rest of the arguments are all the usual arguments to
179        loop.create_server() except protocol_factory; most common are
180        positional host and port, with various optional keyword arguments
181        following.  The return value is the same as loop.create_server().
182
183        Additional optional keyword arguments are loop (to set the event loop
184        instance to use) and limit (to set the buffer limit passed to the
185        TcpStreamReader).
186
187        The return value is the same as loop.create_server(), i.e. a
188        Server object which can be used to stop the service.
189        """
190        if loop is None:
191            loop = events.get_event_loop()
192        else:
193            warnings.warn("The loop argument is deprecated since Python 3.8, "
194                        "and scheduled for removal in Python 3.10.",
195                        DeprecationWarning, stacklevel=2)
196
197        def factory():
198            message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len)
199            reader =  TcpStreamReader(self, message_protocol_settings, limit=limit, loop=loop)
200            protocol = TcpStreamReaderProtocol(self, message_protocol_settings, reader, client_connected_cb,
201                                            loop=loop)
202            return protocol
203
204        return await loop.create_server(factory, host, port, **kwds)

Start a socket server, call back for each client connected.

The first parameter, client_connected_cb, takes two parameters: client_reader, client_writer. client_reader is a TcpStreamReader object, while client_writer is a TcpStreamWriter object. This parameter can either be a plain callback function or a coroutine; if it is a coroutine, it will be automatically converted into a Task.

The rest of the arguments are all the usual arguments to loop.create_server() except protocol_factory; most common are positional host and port, with various optional keyword arguments following. The return value is the same as loop.create_server().

Additional optional keyword arguments are loop (to set the event loop instance to use) and limit (to set the buffer limit passed to the TcpStreamReader).

The return value is the same as loop.create_server(), i.e. a Server object which can be used to stop the service.

def bind_accepted_connection( self, reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter, *, loop=None, limit=10485760, stream_type: StreamType = <StreamType.general: 0>, stream_name: str = '', protocol_greeting: Union[str, NoneType] = None, message_size_len: Union[int, NoneType] = None, max_message_size_len: Union[int, NoneType] = None, **kwds) -> tuple[TcpStreamReader, TcpStreamWriter]:
206    def bind_accepted_connection(self, 
207                            reader: OriginalStreamReader, writer: OriginalStreamWriter, *, loop=None, limit=DEFAULT_LIMIT,
208                            stream_type: StreamType = StreamType.general, stream_name: str = str(),
209                            protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None,
210                            max_message_size_len: Optional[int] = None,
211                            **kwds) -> Tuple['TcpStreamReader', 'TcpStreamWriter']:
212        reader = reinterpret_cast(TcpStreamReader, reader)
213        message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len)
214        reader._bind_to_stream_manager(self, message_protocol_settings, limit=limit, loop=loop)
215        
216        protocol = reinterpret_cast(TcpStreamReaderProtocol, writer._protocol)
217        client_connected_cb = protocol._client_connected_cb
218        protocol._bind_to_stream_manager(message_protocol_settings, reader, client_connected_cb, loop=loop)
219        
220        transport = writer._transport
221        if isinstance(transport, _SSLProtocolTransport):
222            transport = cast(_SSLProtocolTransport, transport)
223            ssl_protocol: SSLProtocol = transport._ssl_protocol
224            ssl_protocol._set_app_protocol(protocol)
225        elif isinstance(transport, _ProactorSocketTransport):
226            transport = cast(_ProactorSocketTransport, transport)
227            transport.set_protocol(protocol)
228        elif isinstance(transport, _SelectorSocketTransport):
229            transport = cast(_SelectorSocketTransport, transport)
230            transport.set_protocol(protocol)
231        else:
232            raise RuntimeError(f'Unsupported transport type: {type(transport)}')
233
234        writer = reinterpret_cast(TcpStreamWriter, writer)
235        writer._bind_to_stream_manager(self, transport, protocol, reader, loop)
236
237        return reader, writer
async def try_establish_message_protocol_server_side( self, reader: TcpStreamReader, writer: TcpStreamWriter) -> bool:
239    async def try_establish_message_protocol_server_side(self, reader: 'TcpStreamReader', writer: 'TcpStreamWriter') -> bool:
240        message_size_len_encoded = await reader.readonly_exactly(1)
241        message_size_len = int.from_bytes(message_size_len_encoded, 'little')
242        can_be_established: bool = False
243        if message_size_len <= reader._message_protocol_settings.max_message_size_len:
244            message_size_encoded = (await reader.readonly_exactly(1 + message_size_len))[1:]
245            message_size = int.from_bytes(message_size_encoded, 'little')
246            message = (await reader.readonly_exactly(1 + message_size_len + message_size))[1 + message_size_len:]
247            if message == reader._message_protocol_settings.protocol_greeting:
248                can_be_established = True
249        
250        if can_be_established:
251            reader._message_protocol_settings.message_size_len = message_size_len
252            writer._protocol._message_protocol_settings.message_size_len = message_size_len
253            await reader.readexactly(1)
254            await reader.read_message()
255            message = reader._message_protocol_settings.message_size_len.to_bytes(1, 'little') + len(reader._message_protocol_settings.protocol_greeting).to_bytes(reader._message_protocol_settings.message_size_len, 'little') + reader._message_protocol_settings.protocol_greeting
256            writer.write(message)
257            await writer.full_drain()
258            return True
259        else:
260            return False
async def try_establish_message_protocol_client_side( self, reader: TcpStreamReader, writer: TcpStreamWriter) -> bool:
262    async def try_establish_message_protocol_client_side(self, reader: 'TcpStreamReader', writer: 'TcpStreamWriter') -> bool:
263        message = reader._message_protocol_settings.message_size_len.to_bytes(1, 'little') + len(reader._message_protocol_settings.protocol_greeting).to_bytes(reader._message_protocol_settings.message_size_len, 'little') + reader._message_protocol_settings.protocol_greeting
264        writer.write(message)
265        await writer.full_drain()
266        message_size_len_encoded = await reader.readonly_exactly(1)
267        message_size_len = int.from_bytes(message_size_len_encoded, 'little')
268        can_be_established: bool = False
269        if message_size_len <= reader._message_protocol_settings.max_message_size_len:
270            message_size_encoded = (await reader.readonly_exactly(1 + message_size_len))[1:]
271            message_size = int.from_bytes(message_size_encoded, 'little')
272            message = (await reader.readonly_exactly(1 + message_size_len + message_size))[1 + message_size_len:]
273            if message == reader._message_protocol_settings.protocol_greeting:
274                can_be_established = True
275        
276        if can_be_established:
277            reader._message_protocol_settings.message_size_len = message_size_len
278            writer._protocol._message_protocol_settings.message_size_len = message_size_len
279            await reader.readexactly(1)
280            await reader.read_message()
281            return True
282        else:
283            return False
class TcpStreamReader(asyncio.streams.StreamReader, cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamReaderAbstract):
377class TcpStreamReader(OriginalStreamReader, StreamReaderAbstract):
378    def __init__(self, manager: TcpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None:
379        self._bind_to_stream_manager(manager, message_protocol_settings, *args, **kwargs)
380
381    def _bind_to_stream_manager(self, manager: TcpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None:
382        super().__init__(*args, **kwargs)
383        self._stream_manager = manager
384        self._message_protocol_settings: MessageProtocolSettings = message_protocol_settings
385        self._smart_buffer: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.input_from_client_container_type(
386            external_data_length=self._stream_manager.io_memory_management.global_in__data_full_size)
387        self.recv_buff_size_computer = RecvBuffSizeComputer()
388        cpu_info_inst = cpu_info()
389        # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size
390        # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l2_cache_size_per_virtual_core
391        # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size_per_virtual_core
392        # self.recv_buff_size_computer.max_recv_buff_size = 3145728
393        self.recv_buff_size_computer.max_recv_buff_size = 10 * 1024**2
394        # self.recv_buff_size_computer.max_recv_buff_size = 1024
395        # print(f'max_recv_buff_size: {self.recv_buff_size_computer.max_recv_buff_size}')
396        self.limit_by_limit: bool = True
397        self.limit_by_global_in__data_size_limit: bool = True
398    
399    async def read_max(self):
400        return await self.read(self._limit)
401    
402    async def read_nearly_max(self):
403        return await self.read_nearly(self._limit)
404    
405    async def read_with_counter(self):
406        if self._exception is not None:
407            raise self._exception
408
409        # This used to just loop creating a new waiter hoping to
410        # collect everything in self._buffer, but that would
411        # deadlock if the subprocess sends more than self.limit
412        # bytes.  So just call self.read(self._limit) until EOF.
413        blocks = []
414        counter = 0
415        while True:
416            block = await self.read_max()
417            counter += 1
418            if not block:
419                break
420            blocks.append(block)
421        return b''.join(blocks), counter
422
423    def __repr__(self):
424        info = ['TcpStreamReader']
425        if self._smart_buffer.size():
426            info.append(f'{self._smart_buffer.size()} bytes')
427        if self._eof:
428            info.append('eof')
429        if self._limit != DEFAULT_LIMIT:
430            info.append(f'limit={self._limit}')
431        if self._waiter:
432            info.append(f'waiter={self._waiter!r}')
433        if self._exception:
434            info.append(f'exception={self._exception!r}')
435        if self._transport:
436            info.append(f'transport={self._transport!r}')
437        if self._paused:
438            info.append('paused')
439        return '<{}>'.format(' '.join(info))
440
441    def _maybe_resume_transport(self):
442        if isinstance(self._transport, (
443            proactor_events._ProactorDatagramTransport,
444            selector_events._SelectorTransport,
445            unix_events._UnixReadPipeTransport
446            )):
447            # if hasattr(self._transport, 'max_size'):
448            try:
449                self._transport.max_size = self.recv_buff_size_computer.recv_buff_size
450                # print(f'max_size: {self._transport.max_size}')
451            except AttributeError:
452                pass
453        else:
454            print(f'Unsupported transport: {type(self._transport)}')
455        
456        if self._paused \
457            and (
458                ((not self.limit_by_limit) and (not self.limit_by_global_in__data_size_limit)) \
459                or (self.limit_by_limit and (not self._limit)) \
460                or (self.limit_by_limit and (self._smart_buffer.size() <= self._limit)) \
461                or (self.limit_by_global_in__data_size_limit and (not self._stream_manager.io_memory_management.global_in__data_size_limit)) \
462                or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value <= self._stream_manager.io_memory_management.global_in__data_size_limit.value))
463            ):
464            self._paused = False
465            self._transport.resume_reading()
466
467    def at_eof(self):
468        """Return True if the buffer is empty and 'feed_eof' was called."""
469        return self._eof and not self._smart_buffer.size()
470
471    def feed_data(self, data):
472        assert not self._eof, 'feed_data after feed_eof'
473
474        if not data:
475            return
476
477        data_len = len(data)
478        self.recv_buff_size_computer.calc_new_recv_buff_size(data_len)
479        self._smart_buffer.add_piece_of_data(data)
480        self._wakeup_waiter()
481
482        if (self._transport is not None and
483                not self._paused 
484                and (
485                    (self.limit_by_limit and (self._smart_buffer.size() > 2 * self._limit)
486                    or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value > self._stream_manager.io_memory_management.global_in__data_size_limit.value)))
487                )):
488            try:
489                self._transport.pause_reading()
490            except NotImplementedError:
491                # The transport can't be paused.
492                # We'll just have to buffer all data.
493                # Forget the transport so we don't keep trying.
494                self._transport = None
495            else:
496                self._paused = True
497
498    async def readline(self):
499        """Read chunk of data from the stream until newline (b'\n') is found.
500
501        On success, return chunk that ends with newline. If only partial
502        line can be read due to EOF, return incomplete line without
503        terminating newline. When EOF was reached while no bytes read, empty
504        bytes object is returned.
505
506        If limit is reached, ValueError will be raised. In that case, if
507        newline was found, complete line including newline will be removed
508        from internal buffer. Else, internal buffer will be cleared. Limit is
509        compared against part of the line without newline.
510
511        If stream was paused, this function will automatically resume it if
512        needed.
513        """
514        sep = b'\n'
515        seplen = len(sep)
516        try:
517            line = await self.readuntil(sep)
518        except IncompleteReadError as e:
519            return e.partial
520        except LimitOverrunError as e:
521            if self._smart_buffer.startswith(sep, e.consumed):
522                self._smart_buffer.get_data(e.consumed + seplen)
523            else:
524                self._smart_buffer.clear()
525            
526            self._maybe_resume_transport()
527            raise ValueError(e.args[0])
528        return line
529
530    async def readuntil(self, separator=b'\n'):
531        """Read data from the stream until ``separator`` is found.
532
533        On success, the data and separator will be removed from the
534        internal buffer (consumed). Returned data will include the
535        separator at the end.
536
537        Configured stream limit is used to check result. Limit sets the
538        maximal length of data that can be returned, not counting the
539        separator.
540
541        If an EOF occurs and the complete separator is still not found,
542        an IncompleteReadError exception will be raised, and the internal
543        buffer will be reset.  The IncompleteReadError.partial attribute
544        may contain the separator partially.
545
546        If the data cannot be read because of over limit, a
547        LimitOverrunError exception  will be raised, and the data
548        will be left in the internal buffer, so it can be read again.
549        """
550        seplen = len(separator)
551        if seplen == 0:
552            raise ValueError('Separator should be at least one-byte string')
553
554        if self._exception is not None:
555            raise self._exception
556
557        # Consume whole buffer except last bytes, which length is
558        # one less than seplen. Let's check corner cases with
559        # separator='SEPARATOR':
560        # * we have received almost complete separator (without last
561        #   byte). i.e buffer='some textSEPARATO'. In this case we
562        #   can safely consume len(separator) - 1 bytes.
563        # * last byte of buffer is first byte of separator, i.e.
564        #   buffer='abcdefghijklmnopqrS'. We may safely consume
565        #   everything except that last byte, but this require to
566        #   analyze bytes of buffer that match partial separator.
567        #   This is slow and/or require FSM. For this case our
568        #   implementation is not optimal, since require rescanning
569        #   of data that is known to not belong to separator. In
570        #   real world, separator will not be so long to notice
571        #   performance problems. Even when reading MIME-encoded
572        #   messages :)
573
574        # `offset` is the number of bytes from the beginning of the buffer
575        # where there is no occurrence of `separator`.
576        offset = 0
577
578        # Loop until we find `separator` in the buffer, exceed the buffer size,
579        # or an EOF has happened.
580        while True:
581            buflen = self._smart_buffer.size()
582
583            # Check if we now have enough data in the buffer for `separator` to
584            # fit.
585            if buflen - offset >= seplen:
586                isep = self._smart_buffer.find(separator, offset)
587
588                if isep != -1:
589                    # `separator` is in the buffer. `isep` will be used later
590                    # to retrieve the data.
591                    break
592
593                # see upper comment for explanation.
594                offset = buflen + 1 - seplen
595                if offset > self._limit:
596                    raise LimitOverrunError(
597                        'Separator is not found, and chunk exceed the limit',
598                        offset)
599
600            # Complete message (with full separator) may be present in buffer
601            # even when EOF flag is set. This may happen when the last chunk
602            # adds data which makes separator be found. That's why we check for
603            # EOF *ater* inspecting the buffer.
604            if self._eof:
605                chunk = self._smart_buffer.get_data(self._smart_buffer.size())
606                raise IncompleteReadError(chunk, None)
607
608            # _wait_for_data() will resume reading if stream was paused.
609            await self._wait_for_data('readuntil')
610
611        if isep > self._limit:
612            raise LimitOverrunError(
613                'Separator is found, but chunk is longer than limit', isep)
614
615        chunk = self._smart_buffer.get_data(isep + seplen)
616        self._maybe_resume_transport()
617        return bytes(chunk)
618
619    async def read(self, n=-1):
620        """Read up to `n` bytes from the stream.
621
622        If n is not provided, or set to -1, read until EOF and return all read
623        bytes. If the EOF was received and the internal buffer is empty, return
624        an empty bytes object.
625
626        If n is zero, return empty bytes object immediately.
627
628        If n is positive, this function try to read `n` bytes, and may return
629        less or equal bytes than requested, but at least one byte. If EOF was
630        received before any byte is read, this function returns empty byte
631        object.
632
633        Returned value is not limited with limit, configured at stream
634        creation.
635
636        If stream was paused, this function will automatically resume it if
637        needed.
638        """
639
640        if self._exception is not None:
641            raise self._exception
642
643        if n == 0:
644            return b''
645
646        if n < 0:
647            # This used to just loop creating a new waiter hoping to
648            # collect everything in self._buffer, but that would
649            # deadlock if the subprocess sends more than self.limit
650            # bytes.  So just call self.read(self._limit) until EOF.
651            blocks = []
652            while True:
653                block = await self.read_nearly(max(self._limit, self._smart_buffer.size()))
654                if not block:
655                    break
656                blocks.append(block)
657            return b''.join(blocks)
658
659        if not self._smart_buffer.size() and not self._eof:
660            await self._wait_for_data('read')
661
662        # This will work right even if buffer is less than n bytes
663        data = self._smart_buffer.get_data(min(n, self._smart_buffer.size()))
664
665        self._maybe_resume_transport()
666        return data
667
668    async def read_nearly(self, n=-1):
669        """Read up to `n` bytes from the stream.
670
671        If n is not provided, or set to -1, read until EOF and return all read
672        bytes. If the EOF was received and the internal buffer is empty, return
673        an empty bytes object.
674
675        If n is zero, return empty bytes object immediately.
676
677        If n is positive, this function try to read `n` bytes, and may return
678        less or equal bytes than requested, but at least one byte. If EOF was
679        received before any byte is read, this function returns empty byte
680        object.
681
682        Returned value is not limited with limit, configured at stream
683        creation.
684
685        If stream was paused, this function will automatically resume it if
686        needed.
687        """
688
689        if self._exception is not None:
690            raise self._exception
691
692        if n == 0:
693            return b''
694
695        if n < 0:
696            # This used to just loop creating a new waiter hoping to
697            # collect everything in self._buffer, but that would
698            # deadlock if the subprocess sends more than self.limit
699            # bytes.  So just call self.read(self._limit) until EOF.
700            blocks = []
701            while True:
702                block = await self.read_nearly(max(self._limit, self._smart_buffer.size()))
703                if not block:
704                    break
705                blocks.append(block)
706            return b''.join(blocks)
707
708        if not self._smart_buffer.size() and not self._eof:
709            await self._wait_for_data('read')
710
711        # This will work right even if buffer is less than n bytes
712        data = self._smart_buffer.get_data_nearly(n)
713
714        self._maybe_resume_transport()
715        return data
716
717    async def readexactly(self, n):
718        """Read exactly `n` bytes.
719
720        Raise an IncompleteReadError if EOF is reached before `n` bytes can be
721        read. The IncompleteReadError.partial attribute of the exception will
722        contain the partial read bytes.
723
724        if n is zero, return empty bytes object.
725
726        Returned value is not limited with limit, configured at stream
727        creation.
728
729        If stream was paused, this function will automatically resume it if
730        needed.
731        """
732        if n < 0:
733            raise ValueError('readexactly size can not be less than zero')
734
735        if self._exception is not None:
736            raise self._exception
737
738        if n == 0:
739            return b''
740
741        while self._smart_buffer.size() < n:
742            if self._eof:
743                incomplete = self._smart_buffer.get_data(self._smart_buffer.size())
744                raise IncompleteReadError(incomplete, n)
745
746            await self._wait_for_data('readexactly')
747
748        if self._smart_buffer.size() == n:
749            data = self._smart_buffer.get_data(self._smart_buffer.size())
750        else:
751            data = self._smart_buffer.get_data(n)
752
753        self._maybe_resume_transport()
754        return data
755    
756    async def readonly_exactly(self, n):
757        if n < 0:
758            raise ValueError('readexactly size can not be less than zero')
759
760        if self._exception is not None:
761            raise self._exception
762
763        if n == 0:
764            return b''
765
766        while self._smart_buffer.size() < n:
767            if self._eof:
768                incomplete = self._smart_buffer.read_data(self._smart_buffer.size())
769                raise IncompleteReadError(incomplete, n)
770
771            await self._wait_for_data('readexactly')
772
773        if self._smart_buffer.size() == n:
774            data = self._smart_buffer.read_data(self._smart_buffer.size())
775        else:
776            data = self._smart_buffer.read_data(n)
777
778        self._maybe_resume_transport()
779        return data
780    
781    async def read_message(self):
782        message_len_encoded = await self.readexactly(self._message_protocol_settings.message_size_len)
783        message_len = int.from_bytes(message_len_encoded, 'little')
784        return await self.readexactly(message_len)
785    
786    def message_awailable(self) -> bool:
787        message_size_len = self._message_protocol_settings.message_size_len
788        if self._smart_buffer.size() < message_size_len:
789            return False
790
791        message_len_encoded = self._smart_buffer.get_data(message_size_len)
792        message_len = int.from_bytes(message_len_encoded, 'little')
793        if self._smart_buffer.size() < (message_size_len + message_len):
794            return False
795        
796        return True
797    
798    def transport_pause_reading(self):
799        try:
800            self._transport.pause_reading()
801        except NotImplementedError:
802            # The transport can't be paused.
803            # We'll just have to buffer all data.
804            # Forget the transport so we don't keep trying.
805            pass
806        else:
807            self._paused = True
808    
809    def transport_resume_reading(self):
810        self._paused = False
811        self._transport.resume_reading()
TcpStreamReader( manager: TcpStreamManager, message_protocol_settings: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_base_internal.MessageProtocolSettings, *args, **kwargs)
378    def __init__(self, manager: TcpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None:
379        self._bind_to_stream_manager(manager, message_protocol_settings, *args, **kwargs)
async def read_max(self):
399    async def read_max(self):
400        return await self.read(self._limit)
async def read_nearly_max(self):
402    async def read_nearly_max(self):
403        return await self.read_nearly(self._limit)
async def read_with_counter(self):
405    async def read_with_counter(self):
406        if self._exception is not None:
407            raise self._exception
408
409        # This used to just loop creating a new waiter hoping to
410        # collect everything in self._buffer, but that would
411        # deadlock if the subprocess sends more than self.limit
412        # bytes.  So just call self.read(self._limit) until EOF.
413        blocks = []
414        counter = 0
415        while True:
416            block = await self.read_max()
417            counter += 1
418            if not block:
419                break
420            blocks.append(block)
421        return b''.join(blocks), counter
def at_eof(self):
467    def at_eof(self):
468        """Return True if the buffer is empty and 'feed_eof' was called."""
469        return self._eof and not self._smart_buffer.size()

Return True if the buffer is empty and 'feed_eof' was called.

def feed_data(self, data):
471    def feed_data(self, data):
472        assert not self._eof, 'feed_data after feed_eof'
473
474        if not data:
475            return
476
477        data_len = len(data)
478        self.recv_buff_size_computer.calc_new_recv_buff_size(data_len)
479        self._smart_buffer.add_piece_of_data(data)
480        self._wakeup_waiter()
481
482        if (self._transport is not None and
483                not self._paused 
484                and (
485                    (self.limit_by_limit and (self._smart_buffer.size() > 2 * self._limit)
486                    or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value > self._stream_manager.io_memory_management.global_in__data_size_limit.value)))
487                )):
488            try:
489                self._transport.pause_reading()
490            except NotImplementedError:
491                # The transport can't be paused.
492                # We'll just have to buffer all data.
493                # Forget the transport so we don't keep trying.
494                self._transport = None
495            else:
496                self._paused = True
async def readline(self):
498    async def readline(self):
499        """Read chunk of data from the stream until newline (b'\n') is found.
500
501        On success, return chunk that ends with newline. If only partial
502        line can be read due to EOF, return incomplete line without
503        terminating newline. When EOF was reached while no bytes read, empty
504        bytes object is returned.
505
506        If limit is reached, ValueError will be raised. In that case, if
507        newline was found, complete line including newline will be removed
508        from internal buffer. Else, internal buffer will be cleared. Limit is
509        compared against part of the line without newline.
510
511        If stream was paused, this function will automatically resume it if
512        needed.
513        """
514        sep = b'\n'
515        seplen = len(sep)
516        try:
517            line = await self.readuntil(sep)
518        except IncompleteReadError as e:
519            return e.partial
520        except LimitOverrunError as e:
521            if self._smart_buffer.startswith(sep, e.consumed):
522                self._smart_buffer.get_data(e.consumed + seplen)
523            else:
524                self._smart_buffer.clear()
525            
526            self._maybe_resume_transport()
527            raise ValueError(e.args[0])
528        return line

Read chunk of data from the stream until newline (b' ') is found.

    On success, return chunk that ends with newline. If only partial
    line can be read due to EOF, return incomplete line without
    terminating newline. When EOF was reached while no bytes read, empty
    bytes object is returned.

    If limit is reached, ValueError will be raised. In that case, if
    newline was found, complete line including newline will be removed
    from internal buffer. Else, internal buffer will be cleared. Limit is
    compared against part of the line without newline.

    If stream was paused, this function will automatically resume it if
    needed.
async def readuntil(self, separator=b'\n'):
530    async def readuntil(self, separator=b'\n'):
531        """Read data from the stream until ``separator`` is found.
532
533        On success, the data and separator will be removed from the
534        internal buffer (consumed). Returned data will include the
535        separator at the end.
536
537        Configured stream limit is used to check result. Limit sets the
538        maximal length of data that can be returned, not counting the
539        separator.
540
541        If an EOF occurs and the complete separator is still not found,
542        an IncompleteReadError exception will be raised, and the internal
543        buffer will be reset.  The IncompleteReadError.partial attribute
544        may contain the separator partially.
545
546        If the data cannot be read because of over limit, a
547        LimitOverrunError exception  will be raised, and the data
548        will be left in the internal buffer, so it can be read again.
549        """
550        seplen = len(separator)
551        if seplen == 0:
552            raise ValueError('Separator should be at least one-byte string')
553
554        if self._exception is not None:
555            raise self._exception
556
557        # Consume whole buffer except last bytes, which length is
558        # one less than seplen. Let's check corner cases with
559        # separator='SEPARATOR':
560        # * we have received almost complete separator (without last
561        #   byte). i.e buffer='some textSEPARATO'. In this case we
562        #   can safely consume len(separator) - 1 bytes.
563        # * last byte of buffer is first byte of separator, i.e.
564        #   buffer='abcdefghijklmnopqrS'. We may safely consume
565        #   everything except that last byte, but this require to
566        #   analyze bytes of buffer that match partial separator.
567        #   This is slow and/or require FSM. For this case our
568        #   implementation is not optimal, since require rescanning
569        #   of data that is known to not belong to separator. In
570        #   real world, separator will not be so long to notice
571        #   performance problems. Even when reading MIME-encoded
572        #   messages :)
573
574        # `offset` is the number of bytes from the beginning of the buffer
575        # where there is no occurrence of `separator`.
576        offset = 0
577
578        # Loop until we find `separator` in the buffer, exceed the buffer size,
579        # or an EOF has happened.
580        while True:
581            buflen = self._smart_buffer.size()
582
583            # Check if we now have enough data in the buffer for `separator` to
584            # fit.
585            if buflen - offset >= seplen:
586                isep = self._smart_buffer.find(separator, offset)
587
588                if isep != -1:
589                    # `separator` is in the buffer. `isep` will be used later
590                    # to retrieve the data.
591                    break
592
593                # see upper comment for explanation.
594                offset = buflen + 1 - seplen
595                if offset > self._limit:
596                    raise LimitOverrunError(
597                        'Separator is not found, and chunk exceed the limit',
598                        offset)
599
600            # Complete message (with full separator) may be present in buffer
601            # even when EOF flag is set. This may happen when the last chunk
602            # adds data which makes separator be found. That's why we check for
603            # EOF *ater* inspecting the buffer.
604            if self._eof:
605                chunk = self._smart_buffer.get_data(self._smart_buffer.size())
606                raise IncompleteReadError(chunk, None)
607
608            # _wait_for_data() will resume reading if stream was paused.
609            await self._wait_for_data('readuntil')
610
611        if isep > self._limit:
612            raise LimitOverrunError(
613                'Separator is found, but chunk is longer than limit', isep)
614
615        chunk = self._smart_buffer.get_data(isep + seplen)
616        self._maybe_resume_transport()
617        return bytes(chunk)

Read data from the stream until separator is found.

On success, the data and separator will be removed from the internal buffer (consumed). Returned data will include the separator at the end.

Configured stream limit is used to check result. Limit sets the maximal length of data that can be returned, not counting the separator.

If an EOF occurs and the complete separator is still not found, an IncompleteReadError exception will be raised, and the internal buffer will be reset. The IncompleteReadError.partial attribute may contain the separator partially.

If the data cannot be read because of over limit, a LimitOverrunError exception will be raised, and the data will be left in the internal buffer, so it can be read again.

async def read(self, n=-1):
619    async def read(self, n=-1):
620        """Read up to `n` bytes from the stream.
621
622        If n is not provided, or set to -1, read until EOF and return all read
623        bytes. If the EOF was received and the internal buffer is empty, return
624        an empty bytes object.
625
626        If n is zero, return empty bytes object immediately.
627
628        If n is positive, this function try to read `n` bytes, and may return
629        less or equal bytes than requested, but at least one byte. If EOF was
630        received before any byte is read, this function returns empty byte
631        object.
632
633        Returned value is not limited with limit, configured at stream
634        creation.
635
636        If stream was paused, this function will automatically resume it if
637        needed.
638        """
639
640        if self._exception is not None:
641            raise self._exception
642
643        if n == 0:
644            return b''
645
646        if n < 0:
647            # This used to just loop creating a new waiter hoping to
648            # collect everything in self._buffer, but that would
649            # deadlock if the subprocess sends more than self.limit
650            # bytes.  So just call self.read(self._limit) until EOF.
651            blocks = []
652            while True:
653                block = await self.read_nearly(max(self._limit, self._smart_buffer.size()))
654                if not block:
655                    break
656                blocks.append(block)
657            return b''.join(blocks)
658
659        if not self._smart_buffer.size() and not self._eof:
660            await self._wait_for_data('read')
661
662        # This will work right even if buffer is less than n bytes
663        data = self._smart_buffer.get_data(min(n, self._smart_buffer.size()))
664
665        self._maybe_resume_transport()
666        return data

Read up to n bytes from the stream.

If n is not provided, or set to -1, read until EOF and return all read bytes. If the EOF was received and the internal buffer is empty, return an empty bytes object.

If n is zero, return empty bytes object immediately.

If n is positive, this function try to read n bytes, and may return less or equal bytes than requested, but at least one byte. If EOF was received before any byte is read, this function returns empty byte object.

Returned value is not limited with limit, configured at stream creation.

If stream was paused, this function will automatically resume it if needed.

async def read_nearly(self, n=-1):
668    async def read_nearly(self, n=-1):
669        """Read up to `n` bytes from the stream.
670
671        If n is not provided, or set to -1, read until EOF and return all read
672        bytes. If the EOF was received and the internal buffer is empty, return
673        an empty bytes object.
674
675        If n is zero, return empty bytes object immediately.
676
677        If n is positive, this function try to read `n` bytes, and may return
678        less or equal bytes than requested, but at least one byte. If EOF was
679        received before any byte is read, this function returns empty byte
680        object.
681
682        Returned value is not limited with limit, configured at stream
683        creation.
684
685        If stream was paused, this function will automatically resume it if
686        needed.
687        """
688
689        if self._exception is not None:
690            raise self._exception
691
692        if n == 0:
693            return b''
694
695        if n < 0:
696            # This used to just loop creating a new waiter hoping to
697            # collect everything in self._buffer, but that would
698            # deadlock if the subprocess sends more than self.limit
699            # bytes.  So just call self.read(self._limit) until EOF.
700            blocks = []
701            while True:
702                block = await self.read_nearly(max(self._limit, self._smart_buffer.size()))
703                if not block:
704                    break
705                blocks.append(block)
706            return b''.join(blocks)
707
708        if not self._smart_buffer.size() and not self._eof:
709            await self._wait_for_data('read')
710
711        # This will work right even if buffer is less than n bytes
712        data = self._smart_buffer.get_data_nearly(n)
713
714        self._maybe_resume_transport()
715        return data

Read up to n bytes from the stream.

If n is not provided, or set to -1, read until EOF and return all read bytes. If the EOF was received and the internal buffer is empty, return an empty bytes object.

If n is zero, return empty bytes object immediately.

If n is positive, this function try to read n bytes, and may return less or equal bytes than requested, but at least one byte. If EOF was received before any byte is read, this function returns empty byte object.

Returned value is not limited with limit, configured at stream creation.

If stream was paused, this function will automatically resume it if needed.

async def readexactly(self, n):
717    async def readexactly(self, n):
718        """Read exactly `n` bytes.
719
720        Raise an IncompleteReadError if EOF is reached before `n` bytes can be
721        read. The IncompleteReadError.partial attribute of the exception will
722        contain the partial read bytes.
723
724        if n is zero, return empty bytes object.
725
726        Returned value is not limited with limit, configured at stream
727        creation.
728
729        If stream was paused, this function will automatically resume it if
730        needed.
731        """
732        if n < 0:
733            raise ValueError('readexactly size can not be less than zero')
734
735        if self._exception is not None:
736            raise self._exception
737
738        if n == 0:
739            return b''
740
741        while self._smart_buffer.size() < n:
742            if self._eof:
743                incomplete = self._smart_buffer.get_data(self._smart_buffer.size())
744                raise IncompleteReadError(incomplete, n)
745
746            await self._wait_for_data('readexactly')
747
748        if self._smart_buffer.size() == n:
749            data = self._smart_buffer.get_data(self._smart_buffer.size())
750        else:
751            data = self._smart_buffer.get_data(n)
752
753        self._maybe_resume_transport()
754        return data

Read exactly n bytes.

Raise an IncompleteReadError if EOF is reached before n bytes can be read. The IncompleteReadError.partial attribute of the exception will contain the partial read bytes.

if n is zero, return empty bytes object.

Returned value is not limited with limit, configured at stream creation.

If stream was paused, this function will automatically resume it if needed.

async def readonly_exactly(self, n):
756    async def readonly_exactly(self, n):
757        if n < 0:
758            raise ValueError('readexactly size can not be less than zero')
759
760        if self._exception is not None:
761            raise self._exception
762
763        if n == 0:
764            return b''
765
766        while self._smart_buffer.size() < n:
767            if self._eof:
768                incomplete = self._smart_buffer.read_data(self._smart_buffer.size())
769                raise IncompleteReadError(incomplete, n)
770
771            await self._wait_for_data('readexactly')
772
773        if self._smart_buffer.size() == n:
774            data = self._smart_buffer.read_data(self._smart_buffer.size())
775        else:
776            data = self._smart_buffer.read_data(n)
777
778        self._maybe_resume_transport()
779        return data
async def read_message(self):
781    async def read_message(self):
782        message_len_encoded = await self.readexactly(self._message_protocol_settings.message_size_len)
783        message_len = int.from_bytes(message_len_encoded, 'little')
784        return await self.readexactly(message_len)
def message_awailable(self) -> bool:
786    def message_awailable(self) -> bool:
787        message_size_len = self._message_protocol_settings.message_size_len
788        if self._smart_buffer.size() < message_size_len:
789            return False
790
791        message_len_encoded = self._smart_buffer.get_data(message_size_len)
792        message_len = int.from_bytes(message_len_encoded, 'little')
793        if self._smart_buffer.size() < (message_size_len + message_len):
794            return False
795        
796        return True
def transport_pause_reading(self):
798    def transport_pause_reading(self):
799        try:
800            self._transport.pause_reading()
801        except NotImplementedError:
802            # The transport can't be paused.
803            # We'll just have to buffer all data.
804            # Forget the transport so we don't keep trying.
805            pass
806        else:
807            self._paused = True
def transport_resume_reading(self):
809    def transport_resume_reading(self):
810        self._paused = False
811        self._transport.resume_reading()
Inherited Members
asyncio.streams.StreamReader
exception
set_exception
set_transport
feed_eof
class TcpStreamReaderProtocol(asyncio.streams.StreamReaderProtocol, cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamReaderProtocolAbstract):
841class TcpStreamReaderProtocol(OriginalStreamReaderProtocol, StreamReaderProtocolAbstract):
842    def __init__(self, manager: TcpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None:
843        self._bind_to_stream_manager(manager, message_protocol_settings, *args, **kwargs)
844
845    def _bind_to_stream_manager(self, manager: TcpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None:
846        super().__init__(*args, **kwargs)
847
848        self._stream_manager = manager
849        self._message_protocol_settings: MessageProtocolSettings = message_protocol_settings
850
851    def connection_made(self, transport):
852        if self._reject_connection:
853            context = {
854                'message': ('An open stream was garbage collected prior to '
855                            'establishing network connection; '
856                            'call "stream.close()" explicitly.')
857            }
858            if self._source_traceback:
859                context['source_traceback'] = self._source_traceback
860            self._loop.call_exception_handler(context)
861            transport.abort()
862            return
863        
864        self._transport = transport
865        reader = self._stream_reader
866        if reader is not None:
867            reader.set_transport(transport)
868
869        self._over_ssl = transport.get_extra_info('sslcontext') is not None
870        if self._client_connected_cb is not None:
871            self._stream_writer = TcpStreamWriter(transport, self,
872                                               reader,
873                                               self._loop)
874            res = self._client_connected_cb(reader,
875                                            self._stream_writer)
876            if coroutines.iscoroutine(res):
877                self._loop.create_task(res)
878            
879            self._strong_reader = None

Helper class to adapt between Protocol and StreamReader.

(This is a helper class instead of making StreamReader itself a Protocol subclass, because the StreamReader has other potential uses, and to prevent the user of the StreamReader to accidentally call inappropriate methods of the protocol.)

TcpStreamReaderProtocol( manager: TcpStreamManager, message_protocol_settings: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_base_internal.MessageProtocolSettings, *args, **kwargs)
842    def __init__(self, manager: TcpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None:
843        self._bind_to_stream_manager(manager, message_protocol_settings, *args, **kwargs)
def connection_made(self, transport):
851    def connection_made(self, transport):
852        if self._reject_connection:
853            context = {
854                'message': ('An open stream was garbage collected prior to '
855                            'establishing network connection; '
856                            'call "stream.close()" explicitly.')
857            }
858            if self._source_traceback:
859                context['source_traceback'] = self._source_traceback
860            self._loop.call_exception_handler(context)
861            transport.abort()
862            return
863        
864        self._transport = transport
865        reader = self._stream_reader
866        if reader is not None:
867            reader.set_transport(transport)
868
869        self._over_ssl = transport.get_extra_info('sslcontext') is not None
870        if self._client_connected_cb is not None:
871            self._stream_writer = TcpStreamWriter(transport, self,
872                                               reader,
873                                               self._loop)
874            res = self._client_connected_cb(reader,
875                                            self._stream_writer)
876            if coroutines.iscoroutine(res):
877                self._loop.create_task(res)
878            
879            self._strong_reader = None

Called when a connection is made.

The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.

Inherited Members
asyncio.streams.StreamReaderProtocol
connection_lost
data_received
eof_received
asyncio.streams.FlowControlMixin
pause_writing
resume_writing
class TcpStreamWriter(asyncio.streams.StreamWriter, cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamWriterAbstract):
1008class TcpStreamWriter(OriginalStreamWriter, StreamWriterAbstract):
1009    def __init__(self, *args, **kwargs) -> None:
1010        self._bind_to_stream_manager(*args, **kwargs)
1011
1012    def _bind_to_stream_manager(self, *args, **kwargs) -> None:
1013        super().__init__(*args, **kwargs)
1014
1015        self._stream_manager: TcpStreamManager = self._protocol._stream_manager
1016        self._output_to_client: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.output_to_client_container_type(
1017            external_data_length=self._stream_manager.io_memory_management.global_out__data_full_size)
1018        self.socket_write_fixed_buffer_size: ValueExistence = self._stream_manager.io_memory_management.socket_write_fixed_buffer_size
1019        self._autonomous_writer_future: Task = None
1020        self._autonomous_writer_future_stop_requessted: bool = False
1021        self._autonomous_writer_drain_enough_futures: List[Future] = list()
1022
1023    def optimized_write(self, data):
1024        self._output_to_client.add_piece_of_data(data)
1025        # self.write(data)
1026
1027    def owrite(self, data):
1028        return self.optimized_write(data)
1029
1030    async def partial_drain(self):
1031        another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
1032        while another_piece_of_data:
1033            self.write(another_piece_of_data)
1034            await self.drain()
1035            another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
1036
1037    async def pdrain(self):
1038        return await self.partial_drain()
1039
1040    async def full_drain(self):
1041        await self.pdrain()
1042        rest_of_the_data_size = self._output_to_client.size()
1043        if rest_of_the_data_size:
1044            another_piece_of_data = self._output_to_client.get_data(rest_of_the_data_size)
1045            self.write(another_piece_of_data)
1046            await self.drain()
1047
1048    async def fdrain(self):
1049        return await self.full_drain()
1050    
1051    def _release_autonomous_writer_waiters(self):
1052        current_size = self._output_to_client.size()
1053        autonomous_writer_drain_enough_futures_buff = self._autonomous_writer_drain_enough_futures
1054        self._autonomous_writer_drain_enough_futures = type(autonomous_writer_drain_enough_futures_buff)()
1055        for item in autonomous_writer_drain_enough_futures_buff:
1056            lower_water_size, future = item
1057            if current_size < lower_water_size:
1058                if (not future.done()) and (not future.cancelled()):
1059                    future.set_result(None)
1060
1061            if (not future.done()) and (not future.cancelled()):
1062                self._autonomous_writer_drain_enough_futures.append(item)
1063    
1064    async def _autonomous_writer_impl(self):
1065        ty = TimedYield(0)
1066        while not self._autonomous_writer_future_stop_requessted:
1067            another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
1068            self._release_autonomous_writer_waiters()
1069            while another_piece_of_data:
1070                self.write(another_piece_of_data)
1071                await self.drain()
1072                another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
1073                self._release_autonomous_writer_waiters()
1074            
1075            await ty()
1076    
1077    def start_autonomous_writer(self):
1078        self._autonomous_writer_future = create_task(self._autonomous_writer_impl)
1079    
1080    def start_aw(self):
1081        return self.start_autonomous_writer()
1082    
1083    async def stop_autonomous_writer(self, timeout: Optional[Union[int, float]] = 0):
1084        """_summary_
1085
1086        Args:
1087            timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout
1088
1089        Returns:
1090            _type_: _description_
1091        """
1092        result = None
1093        if timeout is None:
1094            timeout = self._stream_manager.autonomous_writer_stop_default_timeout
1095        
1096        if self._autonomous_writer_future and (not self._autonomous_writer_future_stop_requessted):
1097            self._autonomous_writer_future_stop_requessted = True
1098            if timeout:
1099                result = await asyncio.wait_for(self._autonomous_writer_future, timeout)
1100            else:
1101                result = await self._autonomous_writer_future
1102            
1103            self._autonomous_writer_future = None
1104            self._autonomous_writer_future_stop_requessted = False
1105        
1106        return result
1107    
1108    async def stop_aw(self, timeout: Optional[Union[int, float]] = 0):
1109        """_summary_
1110
1111        Args:
1112            timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout
1113
1114        Returns:
1115            _type_: _description_
1116        """
1117        return await self.stop_autonomous_writer(timeout)
1118    
1119    async def autonomous_writer_drain_enough(self, lower_water_size: Optional[int] = None):
1120        if lower_water_size is None:
1121            lower_water_size = self.socket_write_fixed_buffer_size.value * 3
1122            # print(f'lower_water_size: {lower_water_size}')
1123            # lower_water_size = cpu_info().l3_cache_size
1124        
1125        if lower_water_size <= self._output_to_client.size():
1126            future: Future = self._loop.create_future()
1127            self._autonomous_writer_drain_enough_futures.append((lower_water_size, future))
1128            await future
1129    
1130    async def aw_drain_enough(self):
1131        await self.autonomous_writer_drain_enough()
1132    
1133    def optimized_write_message(self, data):
1134        self.optimized_write(len(data).to_bytes(self._protocol._message_protocol_settings.message_size_len, 'little') + data)
1135    
1136    def owrite_message(self, data):
1137        self.optimized_write_message(data)
1138    
1139    async def send_message(self, data):
1140        self.optimized_write_message(data)
1141        await self.fdrain()
1142    
1143    @asynccontextmanager
1144    async def autonomous_writer(self):
1145        self.start_autonomous_writer()
1146        try:
1147            yield
1148        finally:
1149            await self.stop_autonomous_writer()
1150    
1151    aw = autonomous_writer
1152
1153    def __enter__(self):
1154        pass
1155
1156    def __exit__(self, exc_type, exc, tb):
1157        self.close()

Wraps a Transport.

This exposes write(), writelines(), [can_]write_eof(), get_extra_info() and close(). It adds drain() which returns an optional Future on which you can wait for flow control. It also adds a transport property which references the Transport directly.

TcpStreamWriter(*args, **kwargs)
1009    def __init__(self, *args, **kwargs) -> None:
1010        self._bind_to_stream_manager(*args, **kwargs)
def optimized_write(self, data):
1023    def optimized_write(self, data):
1024        self._output_to_client.add_piece_of_data(data)
1025        # self.write(data)
def owrite(self, data):
1027    def owrite(self, data):
1028        return self.optimized_write(data)
async def partial_drain(self):
1030    async def partial_drain(self):
1031        another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
1032        while another_piece_of_data:
1033            self.write(another_piece_of_data)
1034            await self.drain()
1035            another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
async def pdrain(self):
1037    async def pdrain(self):
1038        return await self.partial_drain()
async def full_drain(self):
1040    async def full_drain(self):
1041        await self.pdrain()
1042        rest_of_the_data_size = self._output_to_client.size()
1043        if rest_of_the_data_size:
1044            another_piece_of_data = self._output_to_client.get_data(rest_of_the_data_size)
1045            self.write(another_piece_of_data)
1046            await self.drain()
async def fdrain(self):
1048    async def fdrain(self):
1049        return await self.full_drain()
def start_autonomous_writer(self):
1077    def start_autonomous_writer(self):
1078        self._autonomous_writer_future = create_task(self._autonomous_writer_impl)
def start_aw(self):
1080    def start_aw(self):
1081        return self.start_autonomous_writer()
async def stop_autonomous_writer(self, timeout: Union[int, float, NoneType] = 0):
1083    async def stop_autonomous_writer(self, timeout: Optional[Union[int, float]] = 0):
1084        """_summary_
1085
1086        Args:
1087            timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout
1088
1089        Returns:
1090            _type_: _description_
1091        """
1092        result = None
1093        if timeout is None:
1094            timeout = self._stream_manager.autonomous_writer_stop_default_timeout
1095        
1096        if self._autonomous_writer_future and (not self._autonomous_writer_future_stop_requessted):
1097            self._autonomous_writer_future_stop_requessted = True
1098            if timeout:
1099                result = await asyncio.wait_for(self._autonomous_writer_future, timeout)
1100            else:
1101                result = await self._autonomous_writer_future
1102            
1103            self._autonomous_writer_future = None
1104            self._autonomous_writer_future_stop_requessted = False
1105        
1106        return result

_summary_

Args: timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout

Returns: _type_: _description_

async def stop_aw(self, timeout: Union[int, float, NoneType] = 0):
1108    async def stop_aw(self, timeout: Optional[Union[int, float]] = 0):
1109        """_summary_
1110
1111        Args:
1112            timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout
1113
1114        Returns:
1115            _type_: _description_
1116        """
1117        return await self.stop_autonomous_writer(timeout)

_summary_

Args: timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout

Returns: _type_: _description_

async def autonomous_writer_drain_enough(self, lower_water_size: Union[int, NoneType] = None):
1119    async def autonomous_writer_drain_enough(self, lower_water_size: Optional[int] = None):
1120        if lower_water_size is None:
1121            lower_water_size = self.socket_write_fixed_buffer_size.value * 3
1122            # print(f'lower_water_size: {lower_water_size}')
1123            # lower_water_size = cpu_info().l3_cache_size
1124        
1125        if lower_water_size <= self._output_to_client.size():
1126            future: Future = self._loop.create_future()
1127            self._autonomous_writer_drain_enough_futures.append((lower_water_size, future))
1128            await future
async def aw_drain_enough(self):
1130    async def aw_drain_enough(self):
1131        await self.autonomous_writer_drain_enough()
def optimized_write_message(self, data):
1133    def optimized_write_message(self, data):
1134        self.optimized_write(len(data).to_bytes(self._protocol._message_protocol_settings.message_size_len, 'little') + data)
def owrite_message(self, data):
1136    def owrite_message(self, data):
1137        self.optimized_write_message(data)
async def send_message(self, data):
1139    async def send_message(self, data):
1140        self.optimized_write_message(data)
1141        await self.fdrain()
@asynccontextmanager
async def autonomous_writer(self):
1143    @asynccontextmanager
1144    async def autonomous_writer(self):
1145        self.start_autonomous_writer()
1146        try:
1147            yield
1148        finally:
1149            await self.stop_autonomous_writer()
@asynccontextmanager
async def aw(self):
1143    @asynccontextmanager
1144    async def autonomous_writer(self):
1145        self.start_autonomous_writer()
1146        try:
1147            yield
1148        finally:
1149            await self.stop_autonomous_writer()
Inherited Members
asyncio.streams.StreamWriter
transport
write
writelines
write_eof
can_write_eof
close
is_closing
wait_closed
get_extra_info
drain