cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.tcp_efficient_streams
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19""" 20Module Docstring 21Docstrings: http://www.python.org/dev/peps/pep-0257/ 22""" 23 24 25__author__ = "ButenkoMS <gtalk@butenkoms.space>" 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 28__license__ = "Apache License, Version 2.0" 29__version__ = "4.4.1" 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 31__email__ = "gtalk@butenkoms.space" 32# __status__ = "Prototype" 33__status__ = "Development" 34# __status__ = "Production" 35 36 37__all__ = ['StreamType', 'GateSecurityPolicy', 'StreamManagerIOCoreMemoryManagement', 'TcpStreamManager', 'TcpStreamReader', 'TcpStreamReaderProtocol', 'TcpStreamWriter'] 38 39 40import warnings 41import asyncio 42from asyncio.exceptions import IncompleteReadError, LimitOverrunError 43from asyncio import streams 44from asyncio.streams import StreamWriter as OriginalStreamWriter 45from asyncio.streams import StreamReader as OriginalStreamReader 46from asyncio.streams import StreamReaderProtocol as OriginalStreamReaderProtocol 47from asyncio.sslproto import SSLProtocol, _SSLProtocolTransport 48from asyncio.proactor_events import _ProactorSocketTransport 49from asyncio.selector_events import _SelectorSocketTransport 50from asyncio import events 51from asyncio import proactor_events 52from asyncio import selector_events 53try: 54 from asyncio import unix_events 55except ImportError: 56 pass 57 58from asyncio import coroutines 59from asyncio.tasks import sleep, Task 60from asyncio.futures import Future 61from copy import copy 62from enum import Enum 63from cengal.io.asock_io.versions.v_1.recv_buff_size_computer.recv_buff_size_computer__python import RecvBuffSizeComputer 64# from cengal.io.asock_io.versions.v_1.base import IOCoreMemoryManagement 65from cengal.parallel_execution.asyncio.atasks import create_task 66from cengal.parallel_execution.asyncio.timed_yield import TimedYield 67from cengal.hardware.info.cpu import cpu_info 68# from cengal.data_containers.dynamic_list_of_pieces import DynamicListOfPiecesDequeWithLengthControl 69# from cengal.data_containers.fast_fifo import FIFODequeWithLengthControl, FIFOIsEmpty 70# from cengal.data_manipulation.front_triggerable_variable import FrontTriggerableVariable, FrontTriggerableVariableType 71from cengal.data_containers.dynamic_list_of_pieces.versions.v_1.dynamic_list_of_pieces__python import DynamicListOfPiecesDequeWithLengthControl 72from cengal.code_flow_control.smart_values.versions import ValueExistence 73from cengal.data_manipulation.conversion.reinterpret_cast import reinterpret_cast 74from typing import Sequence, Tuple, Type, Set, Optional, Union, List, cast 75from .efficient_streams_base_internal import * 76from .efficient_streams_base import * 77from .efficient_streams_abstract import * 78 79from contextlib import asynccontextmanager 80 81 82class TcpStreamManager(StreamManagerAbstract): 83 def __init__(self) -> None: 84 self.io_memory_management: StreamManagerIOCoreMemoryManagement = StreamManagerIOCoreMemoryManagement() 85 self.autonomous_writer_stop_default_timeout: Optional[Union[int, float]] = 10.0 86 self.output_to_client_container_type = DynamicListOfPiecesDequeWithLengthControl 87 self.input_from_client_container_type = DynamicListOfPiecesDequeWithLengthControl 88 89 async def open_connection(self, host=None, port=None, *, 90 loop=None, limit=DEFAULT_LIMIT, 91 stream_type: StreamType = StreamType.general, stream_name: str = str(), 92 protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None, 93 max_message_size_len: Optional[int] = None, 94 **kwds) -> Tuple['TcpStreamReader', 'TcpStreamWriter']: 95 """A wrapper for create_connection() returning a (reader, writer) pair. 96 97 The reader returned is a TcpStreamReader instance; the writer is a 98 TcpStreamWriter instance. 99 100 The arguments are all the usual arguments to create_connection() 101 except protocol_factory; most common are positional host and port, 102 with various optional keyword arguments following. 103 104 Additional optional keyword arguments are loop (to set the event loop 105 instance to use) and limit (to set the buffer limit passed to the 106 TcpStreamReader). 107 108 (If you want to customize the TcpStreamReader and/or 109 TcpStreamReaderProtocol classes, just copy the code -- there's 110 really nothing special here except some convenience.) 111 """ 112 if StreamType.gate == stream_type: 113 raise ValueError(f'Wrong stream_type value: client can not be a "gate".') 114 115 if loop is None: 116 loop = events.get_event_loop() 117 else: 118 warnings.warn("The loop argument is deprecated since Python 3.8, " 119 "and scheduled for removal in Python 3.10.", 120 DeprecationWarning, stacklevel=2) 121 122 message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len) 123 reader = TcpStreamReader(self, message_protocol_settings, limit=limit, loop=loop) 124 protocol = TcpStreamReaderProtocol(self, message_protocol_settings, reader, loop=loop) 125 transport, _ = await loop.create_connection( 126 lambda: protocol, host, port, **kwds) 127 writer = TcpStreamWriter(transport, protocol, reader, loop) 128 return reader, writer 129 130 def bind_existing_connection(self, reader: OriginalStreamReader, writer: OriginalStreamWriter, *, loop=None, limit=DEFAULT_LIMIT, 131 stream_type: StreamType = StreamType.general, stream_name: str = str(), 132 protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None, 133 max_message_size_len: Optional[int] = None, 134 **kwds) -> Tuple['TcpStreamReader', 'TcpStreamWriter']: 135 reader = reinterpret_cast(TcpStreamReader, reader) 136 message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len) 137 reader._bind_to_stream_manager(self, message_protocol_settings, limit=limit, loop=loop) 138 139 protocol = reinterpret_cast(TcpStreamReaderProtocol, writer._protocol) 140 protocol._bind_to_stream_manager(message_protocol_settings, reader, loop=loop) 141 142 transport = writer._transport 143 if isinstance(transport, _SSLProtocolTransport): 144 transport = cast(_SSLProtocolTransport, transport) 145 ssl_protocol: SSLProtocol = transport._ssl_protocol 146 ssl_protocol._set_app_protocol(protocol) 147 elif isinstance(transport, _ProactorSocketTransport): 148 transport = cast(_ProactorSocketTransport, transport) 149 transport.set_protocol(protocol) 150 elif isinstance(transport, _SelectorSocketTransport): 151 transport = cast(_SelectorSocketTransport, transport) 152 transport.set_protocol(protocol) 153 else: 154 raise RuntimeError(f'Unsupported transport type: {type(transport)}') 155 156 writer = reinterpret_cast(TcpStreamWriter, writer) 157 writer._bind_to_stream_manager(self, transport, protocol, reader, loop) 158 159 return reader, writer 160 161 async def start_server(self, client_connected_cb, host=None, port=None, *, 162 loop=None, limit=DEFAULT_LIMIT, 163 stream_type: StreamType = StreamType.general, stream_name: str = str(), 164 gate_security_policy: GateSecurityPolicy = GateSecurityPolicy.disabled, policy_managed_stream_names: Optional[Set[str]] = None, 165 protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None, 166 max_message_size_len: Optional[int] = None, 167 **kwds): 168 """Start a socket server, call back for each client connected. 169 170 The first parameter, `client_connected_cb`, takes two parameters: 171 client_reader, client_writer. client_reader is a TcpStreamReader 172 object, while client_writer is a TcpStreamWriter object. This 173 parameter can either be a plain callback function or a coroutine; 174 if it is a coroutine, it will be automatically converted into a 175 Task. 176 177 The rest of the arguments are all the usual arguments to 178 loop.create_server() except protocol_factory; most common are 179 positional host and port, with various optional keyword arguments 180 following. The return value is the same as loop.create_server(). 181 182 Additional optional keyword arguments are loop (to set the event loop 183 instance to use) and limit (to set the buffer limit passed to the 184 TcpStreamReader). 185 186 The return value is the same as loop.create_server(), i.e. a 187 Server object which can be used to stop the service. 188 """ 189 if loop is None: 190 loop = events.get_event_loop() 191 else: 192 warnings.warn("The loop argument is deprecated since Python 3.8, " 193 "and scheduled for removal in Python 3.10.", 194 DeprecationWarning, stacklevel=2) 195 196 def factory(): 197 message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len) 198 reader = TcpStreamReader(self, message_protocol_settings, limit=limit, loop=loop) 199 protocol = TcpStreamReaderProtocol(self, message_protocol_settings, reader, client_connected_cb, 200 loop=loop) 201 return protocol 202 203 return await loop.create_server(factory, host, port, **kwds) 204 205 def bind_accepted_connection(self, 206 reader: OriginalStreamReader, writer: OriginalStreamWriter, *, loop=None, limit=DEFAULT_LIMIT, 207 stream_type: StreamType = StreamType.general, stream_name: str = str(), 208 protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None, 209 max_message_size_len: Optional[int] = None, 210 **kwds) -> Tuple['TcpStreamReader', 'TcpStreamWriter']: 211 reader = reinterpret_cast(TcpStreamReader, reader) 212 message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len) 213 reader._bind_to_stream_manager(self, message_protocol_settings, limit=limit, loop=loop) 214 215 protocol = reinterpret_cast(TcpStreamReaderProtocol, writer._protocol) 216 client_connected_cb = protocol._client_connected_cb 217 protocol._bind_to_stream_manager(message_protocol_settings, reader, client_connected_cb, loop=loop) 218 219 transport = writer._transport 220 if isinstance(transport, _SSLProtocolTransport): 221 transport = cast(_SSLProtocolTransport, transport) 222 ssl_protocol: SSLProtocol = transport._ssl_protocol 223 ssl_protocol._set_app_protocol(protocol) 224 elif isinstance(transport, _ProactorSocketTransport): 225 transport = cast(_ProactorSocketTransport, transport) 226 transport.set_protocol(protocol) 227 elif isinstance(transport, _SelectorSocketTransport): 228 transport = cast(_SelectorSocketTransport, transport) 229 transport.set_protocol(protocol) 230 else: 231 raise RuntimeError(f'Unsupported transport type: {type(transport)}') 232 233 writer = reinterpret_cast(TcpStreamWriter, writer) 234 writer._bind_to_stream_manager(self, transport, protocol, reader, loop) 235 236 return reader, writer 237 238 async def try_establish_message_protocol_server_side(self, reader: 'TcpStreamReader', writer: 'TcpStreamWriter') -> bool: 239 message_size_len_encoded = await reader.readonly_exactly(1) 240 message_size_len = int.from_bytes(message_size_len_encoded, 'little') 241 can_be_established: bool = False 242 if message_size_len <= reader._message_protocol_settings.max_message_size_len: 243 message_size_encoded = (await reader.readonly_exactly(1 + message_size_len))[1:] 244 message_size = int.from_bytes(message_size_encoded, 'little') 245 message = (await reader.readonly_exactly(1 + message_size_len + message_size))[1 + message_size_len:] 246 if message == reader._message_protocol_settings.protocol_greeting: 247 can_be_established = True 248 249 if can_be_established: 250 reader._message_protocol_settings.message_size_len = message_size_len 251 writer._protocol._message_protocol_settings.message_size_len = message_size_len 252 await reader.readexactly(1) 253 await reader.read_message() 254 message = reader._message_protocol_settings.message_size_len.to_bytes(1, 'little') + len(reader._message_protocol_settings.protocol_greeting).to_bytes(reader._message_protocol_settings.message_size_len, 'little') + reader._message_protocol_settings.protocol_greeting 255 writer.write(message) 256 await writer.full_drain() 257 return True 258 else: 259 return False 260 261 async def try_establish_message_protocol_client_side(self, reader: 'TcpStreamReader', writer: 'TcpStreamWriter') -> bool: 262 message = reader._message_protocol_settings.message_size_len.to_bytes(1, 'little') + len(reader._message_protocol_settings.protocol_greeting).to_bytes(reader._message_protocol_settings.message_size_len, 'little') + reader._message_protocol_settings.protocol_greeting 263 writer.write(message) 264 await writer.full_drain() 265 message_size_len_encoded = await reader.readonly_exactly(1) 266 message_size_len = int.from_bytes(message_size_len_encoded, 'little') 267 can_be_established: bool = False 268 if message_size_len <= reader._message_protocol_settings.max_message_size_len: 269 message_size_encoded = (await reader.readonly_exactly(1 + message_size_len))[1:] 270 message_size = int.from_bytes(message_size_encoded, 'little') 271 message = (await reader.readonly_exactly(1 + message_size_len + message_size))[1 + message_size_len:] 272 if message == reader._message_protocol_settings.protocol_greeting: 273 can_be_established = True 274 275 if can_be_established: 276 reader._message_protocol_settings.message_size_len = message_size_len 277 writer._protocol._message_protocol_settings.message_size_len = message_size_len 278 await reader.readexactly(1) 279 await reader.read_message() 280 return True 281 else: 282 return False 283 284 285# def classes_with_amax_size() -> Tuple[Type]: 286# types = list() 287# try: 288# pass 289# except AttributeError: 290# pass 291 292 293# class StreamReaderCopy(OriginalStreamReader): 294# # def __init__(self, limit: int, loop: events.AbstractEventLoop) -> None: 295# # self.stream_manager = None 296# # super().__init__(limit, loop) 297 298# def __init__(self, manager: TcpStreamManager, original_stream_reader: OriginalStreamReader) -> None: 299# self._stream_manager = manager 300# self.recv_buff_size_computer = RecvBuffSizeComputer() 301# cpu_info_inst = cpu_info() 302# # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size 303# # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l2_cache_size_per_virtual_core 304# # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size_per_virtual_core 305# # self.recv_buff_size_computer.max_recv_buff_size = 3145728 306# self.recv_buff_size_computer.max_recv_buff_size = 10 * 1024**2 307# # self.recv_buff_size_computer.max_recv_buff_size = 1024 308# print(f'max_recv_buff_size: {self.recv_buff_size_computer.max_recv_buff_size}') 309# original_dict: dict = copy(original_stream_reader.__dict__) 310# original_dict.pop('feed_data', None) 311# original_dict.pop('_stream_manager', None) 312# self.__dict__.update(original_dict) 313 314# def feed_data(self, data): 315# assert not self._eof, 'feed_data after feed_eof' 316 317# if not data: 318# return 319 320# data_len = len(data) 321# self.recv_buff_size_computer.calc_new_recv_buff_size(data_len) 322# self._buffer.extend(data) 323# self._wakeup_waiter() 324 325# if (self._transport is not None and 326# not self._paused and 327# len(self._buffer) > 2 * self._limit): 328# try: 329# self._transport.pause_reading() 330# except NotImplementedError: 331# # The transport can't be paused. 332# # We'll just have to buffer all data. 333# # Forget the transport so we don't keep trying. 334# self._transport = None 335# else: 336# self._paused = True 337 338# def _maybe_resume_transport(self): 339# if isinstance(self._transport, ( 340# proactor_events._ProactorDatagramTransport, 341# selector_events._SelectorTransport, 342# unix_events._UnixReadPipeTransport 343# )): 344# # if hasattr(self._transport, 'max_size'): 345# try: 346# self._transport.max_size = self.recv_buff_size_computer.recv_buff_size 347# # print(f'max_size: {self._transport.max_size}') 348# except AttributeError: 349# pass 350# else: 351# print(f'Unsupported transport: {type(self._transport)}') 352 353# if self._paused and len(self._buffer) <= self._limit: 354# self._paused = False 355# self._transport.resume_reading() 356 357# async def read_with_counter(self): 358# if self._exception is not None: 359# raise self._exception 360 361# # This used to just loop creating a new waiter hoping to 362# # collect everything in self._buffer, but that would 363# # deadlock if the subprocess sends more than self.limit 364# # bytes. So just call self.read(self._limit) until EOF. 365# blocks = [] 366# counter = 0 367# while True: 368# block = await self.read(self._limit) 369# counter += 1 370# if not block: 371# break 372# blocks.append(block) 373# return b''.join(blocks), counter 374 375 376class TcpStreamReader(OriginalStreamReader, StreamReaderAbstract): 377 def __init__(self, manager: TcpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None: 378 self._bind_to_stream_manager(manager, message_protocol_settings, *args, **kwargs) 379 380 def _bind_to_stream_manager(self, manager: TcpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None: 381 super().__init__(*args, **kwargs) 382 self._stream_manager = manager 383 self._message_protocol_settings: MessageProtocolSettings = message_protocol_settings 384 self._smart_buffer: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.input_from_client_container_type( 385 external_data_length=self._stream_manager.io_memory_management.global_in__data_full_size) 386 self.recv_buff_size_computer = RecvBuffSizeComputer() 387 cpu_info_inst = cpu_info() 388 # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size 389 # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l2_cache_size_per_virtual_core 390 # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size_per_virtual_core 391 # self.recv_buff_size_computer.max_recv_buff_size = 3145728 392 self.recv_buff_size_computer.max_recv_buff_size = 10 * 1024**2 393 # self.recv_buff_size_computer.max_recv_buff_size = 1024 394 # print(f'max_recv_buff_size: {self.recv_buff_size_computer.max_recv_buff_size}') 395 self.limit_by_limit: bool = True 396 self.limit_by_global_in__data_size_limit: bool = True 397 398 async def read_max(self): 399 return await self.read(self._limit) 400 401 async def read_nearly_max(self): 402 return await self.read_nearly(self._limit) 403 404 async def read_with_counter(self): 405 if self._exception is not None: 406 raise self._exception 407 408 # This used to just loop creating a new waiter hoping to 409 # collect everything in self._buffer, but that would 410 # deadlock if the subprocess sends more than self.limit 411 # bytes. So just call self.read(self._limit) until EOF. 412 blocks = [] 413 counter = 0 414 while True: 415 block = await self.read_max() 416 counter += 1 417 if not block: 418 break 419 blocks.append(block) 420 return b''.join(blocks), counter 421 422 def __repr__(self): 423 info = ['TcpStreamReader'] 424 if self._smart_buffer.size(): 425 info.append(f'{self._smart_buffer.size()} bytes') 426 if self._eof: 427 info.append('eof') 428 if self._limit != DEFAULT_LIMIT: 429 info.append(f'limit={self._limit}') 430 if self._waiter: 431 info.append(f'waiter={self._waiter!r}') 432 if self._exception: 433 info.append(f'exception={self._exception!r}') 434 if self._transport: 435 info.append(f'transport={self._transport!r}') 436 if self._paused: 437 info.append('paused') 438 return '<{}>'.format(' '.join(info)) 439 440 def _maybe_resume_transport(self): 441 if isinstance(self._transport, ( 442 proactor_events._ProactorDatagramTransport, 443 selector_events._SelectorTransport, 444 unix_events._UnixReadPipeTransport 445 )): 446 # if hasattr(self._transport, 'max_size'): 447 try: 448 self._transport.max_size = self.recv_buff_size_computer.recv_buff_size 449 # print(f'max_size: {self._transport.max_size}') 450 except AttributeError: 451 pass 452 else: 453 print(f'Unsupported transport: {type(self._transport)}') 454 455 if self._paused \ 456 and ( 457 ((not self.limit_by_limit) and (not self.limit_by_global_in__data_size_limit)) \ 458 or (self.limit_by_limit and (not self._limit)) \ 459 or (self.limit_by_limit and (self._smart_buffer.size() <= self._limit)) \ 460 or (self.limit_by_global_in__data_size_limit and (not self._stream_manager.io_memory_management.global_in__data_size_limit)) \ 461 or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value <= self._stream_manager.io_memory_management.global_in__data_size_limit.value)) 462 ): 463 self._paused = False 464 self._transport.resume_reading() 465 466 def at_eof(self): 467 """Return True if the buffer is empty and 'feed_eof' was called.""" 468 return self._eof and not self._smart_buffer.size() 469 470 def feed_data(self, data): 471 assert not self._eof, 'feed_data after feed_eof' 472 473 if not data: 474 return 475 476 data_len = len(data) 477 self.recv_buff_size_computer.calc_new_recv_buff_size(data_len) 478 self._smart_buffer.add_piece_of_data(data) 479 self._wakeup_waiter() 480 481 if (self._transport is not None and 482 not self._paused 483 and ( 484 (self.limit_by_limit and (self._smart_buffer.size() > 2 * self._limit) 485 or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value > self._stream_manager.io_memory_management.global_in__data_size_limit.value))) 486 )): 487 try: 488 self._transport.pause_reading() 489 except NotImplementedError: 490 # The transport can't be paused. 491 # We'll just have to buffer all data. 492 # Forget the transport so we don't keep trying. 493 self._transport = None 494 else: 495 self._paused = True 496 497 async def readline(self): 498 """Read chunk of data from the stream until newline (b'\n') is found. 499 500 On success, return chunk that ends with newline. If only partial 501 line can be read due to EOF, return incomplete line without 502 terminating newline. When EOF was reached while no bytes read, empty 503 bytes object is returned. 504 505 If limit is reached, ValueError will be raised. In that case, if 506 newline was found, complete line including newline will be removed 507 from internal buffer. Else, internal buffer will be cleared. Limit is 508 compared against part of the line without newline. 509 510 If stream was paused, this function will automatically resume it if 511 needed. 512 """ 513 sep = b'\n' 514 seplen = len(sep) 515 try: 516 line = await self.readuntil(sep) 517 except IncompleteReadError as e: 518 return e.partial 519 except LimitOverrunError as e: 520 if self._smart_buffer.startswith(sep, e.consumed): 521 self._smart_buffer.get_data(e.consumed + seplen) 522 else: 523 self._smart_buffer.clear() 524 525 self._maybe_resume_transport() 526 raise ValueError(e.args[0]) 527 return line 528 529 async def readuntil(self, separator=b'\n'): 530 """Read data from the stream until ``separator`` is found. 531 532 On success, the data and separator will be removed from the 533 internal buffer (consumed). Returned data will include the 534 separator at the end. 535 536 Configured stream limit is used to check result. Limit sets the 537 maximal length of data that can be returned, not counting the 538 separator. 539 540 If an EOF occurs and the complete separator is still not found, 541 an IncompleteReadError exception will be raised, and the internal 542 buffer will be reset. The IncompleteReadError.partial attribute 543 may contain the separator partially. 544 545 If the data cannot be read because of over limit, a 546 LimitOverrunError exception will be raised, and the data 547 will be left in the internal buffer, so it can be read again. 548 """ 549 seplen = len(separator) 550 if seplen == 0: 551 raise ValueError('Separator should be at least one-byte string') 552 553 if self._exception is not None: 554 raise self._exception 555 556 # Consume whole buffer except last bytes, which length is 557 # one less than seplen. Let's check corner cases with 558 # separator='SEPARATOR': 559 # * we have received almost complete separator (without last 560 # byte). i.e buffer='some textSEPARATO'. In this case we 561 # can safely consume len(separator) - 1 bytes. 562 # * last byte of buffer is first byte of separator, i.e. 563 # buffer='abcdefghijklmnopqrS'. We may safely consume 564 # everything except that last byte, but this require to 565 # analyze bytes of buffer that match partial separator. 566 # This is slow and/or require FSM. For this case our 567 # implementation is not optimal, since require rescanning 568 # of data that is known to not belong to separator. In 569 # real world, separator will not be so long to notice 570 # performance problems. Even when reading MIME-encoded 571 # messages :) 572 573 # `offset` is the number of bytes from the beginning of the buffer 574 # where there is no occurrence of `separator`. 575 offset = 0 576 577 # Loop until we find `separator` in the buffer, exceed the buffer size, 578 # or an EOF has happened. 579 while True: 580 buflen = self._smart_buffer.size() 581 582 # Check if we now have enough data in the buffer for `separator` to 583 # fit. 584 if buflen - offset >= seplen: 585 isep = self._smart_buffer.find(separator, offset) 586 587 if isep != -1: 588 # `separator` is in the buffer. `isep` will be used later 589 # to retrieve the data. 590 break 591 592 # see upper comment for explanation. 593 offset = buflen + 1 - seplen 594 if offset > self._limit: 595 raise LimitOverrunError( 596 'Separator is not found, and chunk exceed the limit', 597 offset) 598 599 # Complete message (with full separator) may be present in buffer 600 # even when EOF flag is set. This may happen when the last chunk 601 # adds data which makes separator be found. That's why we check for 602 # EOF *ater* inspecting the buffer. 603 if self._eof: 604 chunk = self._smart_buffer.get_data(self._smart_buffer.size()) 605 raise IncompleteReadError(chunk, None) 606 607 # _wait_for_data() will resume reading if stream was paused. 608 await self._wait_for_data('readuntil') 609 610 if isep > self._limit: 611 raise LimitOverrunError( 612 'Separator is found, but chunk is longer than limit', isep) 613 614 chunk = self._smart_buffer.get_data(isep + seplen) 615 self._maybe_resume_transport() 616 return bytes(chunk) 617 618 async def read(self, n=-1): 619 """Read up to `n` bytes from the stream. 620 621 If n is not provided, or set to -1, read until EOF and return all read 622 bytes. If the EOF was received and the internal buffer is empty, return 623 an empty bytes object. 624 625 If n is zero, return empty bytes object immediately. 626 627 If n is positive, this function try to read `n` bytes, and may return 628 less or equal bytes than requested, but at least one byte. If EOF was 629 received before any byte is read, this function returns empty byte 630 object. 631 632 Returned value is not limited with limit, configured at stream 633 creation. 634 635 If stream was paused, this function will automatically resume it if 636 needed. 637 """ 638 639 if self._exception is not None: 640 raise self._exception 641 642 if n == 0: 643 return b'' 644 645 if n < 0: 646 # This used to just loop creating a new waiter hoping to 647 # collect everything in self._buffer, but that would 648 # deadlock if the subprocess sends more than self.limit 649 # bytes. So just call self.read(self._limit) until EOF. 650 blocks = [] 651 while True: 652 block = await self.read_nearly(max(self._limit, self._smart_buffer.size())) 653 if not block: 654 break 655 blocks.append(block) 656 return b''.join(blocks) 657 658 if not self._smart_buffer.size() and not self._eof: 659 await self._wait_for_data('read') 660 661 # This will work right even if buffer is less than n bytes 662 data = self._smart_buffer.get_data(min(n, self._smart_buffer.size())) 663 664 self._maybe_resume_transport() 665 return data 666 667 async def read_nearly(self, n=-1): 668 """Read up to `n` bytes from the stream. 669 670 If n is not provided, or set to -1, read until EOF and return all read 671 bytes. If the EOF was received and the internal buffer is empty, return 672 an empty bytes object. 673 674 If n is zero, return empty bytes object immediately. 675 676 If n is positive, this function try to read `n` bytes, and may return 677 less or equal bytes than requested, but at least one byte. If EOF was 678 received before any byte is read, this function returns empty byte 679 object. 680 681 Returned value is not limited with limit, configured at stream 682 creation. 683 684 If stream was paused, this function will automatically resume it if 685 needed. 686 """ 687 688 if self._exception is not None: 689 raise self._exception 690 691 if n == 0: 692 return b'' 693 694 if n < 0: 695 # This used to just loop creating a new waiter hoping to 696 # collect everything in self._buffer, but that would 697 # deadlock if the subprocess sends more than self.limit 698 # bytes. So just call self.read(self._limit) until EOF. 699 blocks = [] 700 while True: 701 block = await self.read_nearly(max(self._limit, self._smart_buffer.size())) 702 if not block: 703 break 704 blocks.append(block) 705 return b''.join(blocks) 706 707 if not self._smart_buffer.size() and not self._eof: 708 await self._wait_for_data('read') 709 710 # This will work right even if buffer is less than n bytes 711 data = self._smart_buffer.get_data_nearly(n) 712 713 self._maybe_resume_transport() 714 return data 715 716 async def readexactly(self, n): 717 """Read exactly `n` bytes. 718 719 Raise an IncompleteReadError if EOF is reached before `n` bytes can be 720 read. The IncompleteReadError.partial attribute of the exception will 721 contain the partial read bytes. 722 723 if n is zero, return empty bytes object. 724 725 Returned value is not limited with limit, configured at stream 726 creation. 727 728 If stream was paused, this function will automatically resume it if 729 needed. 730 """ 731 if n < 0: 732 raise ValueError('readexactly size can not be less than zero') 733 734 if self._exception is not None: 735 raise self._exception 736 737 if n == 0: 738 return b'' 739 740 while self._smart_buffer.size() < n: 741 if self._eof: 742 incomplete = self._smart_buffer.get_data(self._smart_buffer.size()) 743 raise IncompleteReadError(incomplete, n) 744 745 await self._wait_for_data('readexactly') 746 747 if self._smart_buffer.size() == n: 748 data = self._smart_buffer.get_data(self._smart_buffer.size()) 749 else: 750 data = self._smart_buffer.get_data(n) 751 752 self._maybe_resume_transport() 753 return data 754 755 async def readonly_exactly(self, n): 756 if n < 0: 757 raise ValueError('readexactly size can not be less than zero') 758 759 if self._exception is not None: 760 raise self._exception 761 762 if n == 0: 763 return b'' 764 765 while self._smart_buffer.size() < n: 766 if self._eof: 767 incomplete = self._smart_buffer.read_data(self._smart_buffer.size()) 768 raise IncompleteReadError(incomplete, n) 769 770 await self._wait_for_data('readexactly') 771 772 if self._smart_buffer.size() == n: 773 data = self._smart_buffer.read_data(self._smart_buffer.size()) 774 else: 775 data = self._smart_buffer.read_data(n) 776 777 self._maybe_resume_transport() 778 return data 779 780 async def read_message(self): 781 message_len_encoded = await self.readexactly(self._message_protocol_settings.message_size_len) 782 message_len = int.from_bytes(message_len_encoded, 'little') 783 return await self.readexactly(message_len) 784 785 def message_awailable(self) -> bool: 786 message_size_len = self._message_protocol_settings.message_size_len 787 if self._smart_buffer.size() < message_size_len: 788 return False 789 790 message_len_encoded = self._smart_buffer.get_data(message_size_len) 791 message_len = int.from_bytes(message_len_encoded, 'little') 792 if self._smart_buffer.size() < (message_size_len + message_len): 793 return False 794 795 return True 796 797 def transport_pause_reading(self): 798 try: 799 self._transport.pause_reading() 800 except NotImplementedError: 801 # The transport can't be paused. 802 # We'll just have to buffer all data. 803 # Forget the transport so we don't keep trying. 804 pass 805 else: 806 self._paused = True 807 808 def transport_resume_reading(self): 809 self._paused = False 810 self._transport.resume_reading() 811 812 813# class StreamReaderProtocolCopy(OriginalStreamReaderProtocol): 814# def __init__(self, manager: TcpStreamManager, original_stream_reader_protocol: OriginalStreamReaderProtocol) -> None: 815# self._stream_manager = manager 816# original_dict: dict = copy(original_stream_reader_protocol.__dict__) 817# original_dict.pop('_stream_manager', None) 818# original_dict.pop('_client_connected_cb', None) 819# self.__dict__.update(original_dict) 820# self._original__client_connected_cb = None 821# self._client_connected_cb = original_stream_reader_protocol._client_connected_cb 822 823# async def _wrapper__client_connected_cb(self, reader: OriginalStreamReader, writer: OriginalStreamWriter): 824# self._stream_writer = StreamWriterCopy(self._stream_manager, writer) 825# if self._original__client_connected_cb is not None: 826# await self._original__client_connected_cb(reader, self._stream_writer) 827 828# @property 829# def _client_connected_cb(self): 830# if self._original__client_connected_cb is None: 831# return None 832# else: 833# return self._wrapper__client_connected_cb 834 835# @_client_connected_cb.setter 836# def _client_connected_cb(self, value): 837# self._original__client_connected_cb = value 838 839 840class TcpStreamReaderProtocol(OriginalStreamReaderProtocol, StreamReaderProtocolAbstract): 841 def __init__(self, manager: TcpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None: 842 self._bind_to_stream_manager(manager, message_protocol_settings, *args, **kwargs) 843 844 def _bind_to_stream_manager(self, manager: TcpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None: 845 super().__init__(*args, **kwargs) 846 847 self._stream_manager = manager 848 self._message_protocol_settings: MessageProtocolSettings = message_protocol_settings 849 850 def connection_made(self, transport): 851 if self._reject_connection: 852 context = { 853 'message': ('An open stream was garbage collected prior to ' 854 'establishing network connection; ' 855 'call "stream.close()" explicitly.') 856 } 857 if self._source_traceback: 858 context['source_traceback'] = self._source_traceback 859 self._loop.call_exception_handler(context) 860 transport.abort() 861 return 862 863 self._transport = transport 864 reader = self._stream_reader 865 if reader is not None: 866 reader.set_transport(transport) 867 868 self._over_ssl = transport.get_extra_info('sslcontext') is not None 869 if self._client_connected_cb is not None: 870 self._stream_writer = TcpStreamWriter(transport, self, 871 reader, 872 self._loop) 873 res = self._client_connected_cb(reader, 874 self._stream_writer) 875 if coroutines.iscoroutine(res): 876 self._loop.create_task(res) 877 878 self._strong_reader = None 879 880 881# class StreamWriterCopy(OriginalStreamWriter): 882# def __init__(self, manager: TcpStreamManager, original_stream_writer: OriginalStreamWriter) -> None: 883# self._stream_manager = manager 884# self._output_to_client: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.output_to_client_container_type( 885# external_data_length=self._stream_manager.io_memory_management.global_out__data_full_size) 886# self.socket_write_fixed_buffer_size: ValueExistence = self._stream_manager.io_memory_management.socket_write_fixed_buffer_size 887# self._autonomous_writer_future: Task = None 888# self._autonomous_writer_future_stop_requessted: bool = False 889# self._autonomous_writer_drain_enough_futures: List[Future] = list() 890# original_dict: dict = copy(original_stream_writer.__dict__) 891# original_dict.pop('_stream_manager', None) 892# original_dict.pop('_output_to_client', None) 893# original_dict.pop('_autonomous_writer_future_stop_requessted', None) 894# self.__dict__.update(original_dict) 895 896# def optimized_write(self, data): 897# # self._output_to_client.add_piece_of_data(data) 898# self.write(data) 899 900# def owrite(self, data): 901# return self.optimized_write(data) 902 903# async def partial_drain(self): 904# await self.drain() 905# # another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 906# # while another_piece_of_data: 907# # self.write(another_piece_of_data) 908# # await self.drain() 909# # another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 910 911# async def pdrain(self): 912# return await self.partial_drain() 913 914# async def full_drain(self): 915# await self.pdrain() 916# rest_of_the_data_size = self._output_to_client.size() 917# if rest_of_the_data_size: 918# another_piece_of_data = self._output_to_client.get_data(rest_of_the_data_size) 919# self.write(another_piece_of_data) 920# await self.drain() 921 922# async def fdrain(self): 923# return await self.full_drain() 924 925# def _release_autonomous_writer_waiters(self): 926# current_size = self._output_to_client.size() 927# autonomous_writer_drain_enough_futures_buff = self._autonomous_writer_drain_enough_futures 928# self._autonomous_writer_drain_enough_futures = type(autonomous_writer_drain_enough_futures_buff)() 929# for item in autonomous_writer_drain_enough_futures_buff: 930# lower_water_size, future = item 931# if current_size < lower_water_size: 932# if (not future.done()) and (not future.cancelled()): 933# future.set_result(None) 934 935# if (not future.done()) and (not future.cancelled()): 936# self._autonomous_writer_drain_enough_futures.append(item) 937 938# async def _autonomous_writer_impl(self): 939# ty = TimedYield(0) 940# while not self._autonomous_writer_future_stop_requessted: 941# another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 942# self._release_autonomous_writer_waiters() 943# while another_piece_of_data: 944# self.write(another_piece_of_data) 945# await self.drain() 946# another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 947# self._release_autonomous_writer_waiters() 948 949# await ty() 950 951# def start_autonomous_writer(self): 952# self._autonomous_writer_future = create_task(self._autonomous_writer_impl) 953 954# def start_aw(self): 955# return self.start_autonomous_writer() 956 957# async def stop_autonomous_writer(self, timeout: Optional[Union[int, float]] = 0): 958# """_summary_ 959 960# Args: 961# timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout 962 963# Returns: 964# _type_: _description_ 965# """ 966# result = None 967# if timeout is None: 968# timeout = self._stream_manager.autonomous_writer_stop_default_timeout 969 970# if self._autonomous_writer_future and (not self._autonomous_writer_future_stop_requessted): 971# self._autonomous_writer_future_stop_requessted = True 972# if timeout: 973# result = await asyncio.wait_for(self._autonomous_writer_future, timeout) 974# else: 975# result = await self._autonomous_writer_future 976 977# self._autonomous_writer_future = None 978# self._autonomous_writer_future_stop_requessted = False 979 980# return result 981 982# def stop_aw(self, timeout: Optional[Union[int, float]] = 0): 983# """_summary_ 984 985# Args: 986# timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout 987 988# Returns: 989# _type_: _description_ 990# """ 991# return self.stop_autonomous_writer(timeout) 992 993# async def autonomous_writer_drain_enough(self, lower_water_size: Optional[int] = None): 994# if lower_water_size is None: 995# lower_water_size = self.socket_write_fixed_buffer_size.value * 2 996 997# if lower_water_size <= self._output_to_client.size(): 998# future: Future = self._loop.create_future() 999# self._autonomous_writer_drain_enough_futures.append((lower_water_size, future)) 1000# await future 1001 1002 1003# async def aw_drain_enough(self): 1004# await self.autonomous_writer_drain_enough() 1005 1006 1007class TcpStreamWriter(OriginalStreamWriter, StreamWriterAbstract): 1008 def __init__(self, *args, **kwargs) -> None: 1009 self._bind_to_stream_manager(*args, **kwargs) 1010 1011 def _bind_to_stream_manager(self, *args, **kwargs) -> None: 1012 super().__init__(*args, **kwargs) 1013 1014 self._stream_manager: TcpStreamManager = self._protocol._stream_manager 1015 self._output_to_client: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.output_to_client_container_type( 1016 external_data_length=self._stream_manager.io_memory_management.global_out__data_full_size) 1017 self.socket_write_fixed_buffer_size: ValueExistence = self._stream_manager.io_memory_management.socket_write_fixed_buffer_size 1018 self._autonomous_writer_future: Task = None 1019 self._autonomous_writer_future_stop_requessted: bool = False 1020 self._autonomous_writer_drain_enough_futures: List[Future] = list() 1021 1022 def optimized_write(self, data): 1023 self._output_to_client.add_piece_of_data(data) 1024 # self.write(data) 1025 1026 def owrite(self, data): 1027 return self.optimized_write(data) 1028 1029 async def partial_drain(self): 1030 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 1031 while another_piece_of_data: 1032 self.write(another_piece_of_data) 1033 await self.drain() 1034 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 1035 1036 async def pdrain(self): 1037 return await self.partial_drain() 1038 1039 async def full_drain(self): 1040 await self.pdrain() 1041 rest_of_the_data_size = self._output_to_client.size() 1042 if rest_of_the_data_size: 1043 another_piece_of_data = self._output_to_client.get_data(rest_of_the_data_size) 1044 self.write(another_piece_of_data) 1045 await self.drain() 1046 1047 async def fdrain(self): 1048 return await self.full_drain() 1049 1050 def _release_autonomous_writer_waiters(self): 1051 current_size = self._output_to_client.size() 1052 autonomous_writer_drain_enough_futures_buff = self._autonomous_writer_drain_enough_futures 1053 self._autonomous_writer_drain_enough_futures = type(autonomous_writer_drain_enough_futures_buff)() 1054 for item in autonomous_writer_drain_enough_futures_buff: 1055 lower_water_size, future = item 1056 if current_size < lower_water_size: 1057 if (not future.done()) and (not future.cancelled()): 1058 future.set_result(None) 1059 1060 if (not future.done()) and (not future.cancelled()): 1061 self._autonomous_writer_drain_enough_futures.append(item) 1062 1063 async def _autonomous_writer_impl(self): 1064 ty = TimedYield(0) 1065 while not self._autonomous_writer_future_stop_requessted: 1066 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 1067 self._release_autonomous_writer_waiters() 1068 while another_piece_of_data: 1069 self.write(another_piece_of_data) 1070 await self.drain() 1071 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 1072 self._release_autonomous_writer_waiters() 1073 1074 await ty() 1075 1076 def start_autonomous_writer(self): 1077 self._autonomous_writer_future = create_task(self._autonomous_writer_impl) 1078 1079 def start_aw(self): 1080 return self.start_autonomous_writer() 1081 1082 async def stop_autonomous_writer(self, timeout: Optional[Union[int, float]] = 0): 1083 """_summary_ 1084 1085 Args: 1086 timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout 1087 1088 Returns: 1089 _type_: _description_ 1090 """ 1091 result = None 1092 if timeout is None: 1093 timeout = self._stream_manager.autonomous_writer_stop_default_timeout 1094 1095 if self._autonomous_writer_future and (not self._autonomous_writer_future_stop_requessted): 1096 self._autonomous_writer_future_stop_requessted = True 1097 if timeout: 1098 result = await asyncio.wait_for(self._autonomous_writer_future, timeout) 1099 else: 1100 result = await self._autonomous_writer_future 1101 1102 self._autonomous_writer_future = None 1103 self._autonomous_writer_future_stop_requessted = False 1104 1105 return result 1106 1107 async def stop_aw(self, timeout: Optional[Union[int, float]] = 0): 1108 """_summary_ 1109 1110 Args: 1111 timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout 1112 1113 Returns: 1114 _type_: _description_ 1115 """ 1116 return await self.stop_autonomous_writer(timeout) 1117 1118 async def autonomous_writer_drain_enough(self, lower_water_size: Optional[int] = None): 1119 if lower_water_size is None: 1120 lower_water_size = self.socket_write_fixed_buffer_size.value * 3 1121 # print(f'lower_water_size: {lower_water_size}') 1122 # lower_water_size = cpu_info().l3_cache_size 1123 1124 if lower_water_size <= self._output_to_client.size(): 1125 future: Future = self._loop.create_future() 1126 self._autonomous_writer_drain_enough_futures.append((lower_water_size, future)) 1127 await future 1128 1129 async def aw_drain_enough(self): 1130 await self.autonomous_writer_drain_enough() 1131 1132 def optimized_write_message(self, data): 1133 self.optimized_write(len(data).to_bytes(self._protocol._message_protocol_settings.message_size_len, 'little') + data) 1134 1135 def owrite_message(self, data): 1136 self.optimized_write_message(data) 1137 1138 async def send_message(self, data): 1139 self.optimized_write_message(data) 1140 await self.fdrain() 1141 1142 @asynccontextmanager 1143 async def autonomous_writer(self): 1144 self.start_autonomous_writer() 1145 try: 1146 yield 1147 finally: 1148 await self.stop_autonomous_writer() 1149 1150 aw = autonomous_writer 1151 1152 def __enter__(self): 1153 pass 1154 1155 def __exit__(self, exc_type, exc, tb): 1156 self.close()
49class StreamType(Enum): 50 general = 0 51 message_based_anonymous = 1 52 message_based_names = 2 53 gate = 3
An enumeration.
Inherited Members
- enum.Enum
- name
- value
56class GateSecurityPolicy(Enum): 57 # gate will allow only or ban selected stream names to conect to this gate 58 allowed = 0 59 disabled = 1
An enumeration.
Inherited Members
- enum.Enum
- name
- value
62class StreamManagerIOCoreMemoryManagement(IOCoreMemoryManagement): 63 def __init__(self): 64 super(StreamManagerIOCoreMemoryManagement, self).__init__() 65 66 self.socket_write_fixed_buffer_size = ValueExistence(True, 67 cpu_info().l2_cache_size_per_virtual_core) 68 69 def link_to(self, parent): 70 super(StreamManagerIOCoreMemoryManagement, self).link_to(parent) 71 try: 72 self.socket_write_fixed_buffer_size = parent.socket_write_fixed_buffer_size 73 except AttributeError: 74 pass
Inherited Members
- cengal.io.core.memory_management.versions.v_0.memory_management.IOCoreMemoryManagement
- global__data_size_limit
- global__data_full_size
- global__deletable_data_full_size
- global_other__data_size_limit
- global_other__data_full_size
- global_other__deletable_data_full_size
- global_in__data_size_limit
- global_in__data_full_size
- global_in__deletable_data_full_size
- global_out__data_size_limit
- global_out__data_full_size
- global_out__deletable_data_full_size
83class TcpStreamManager(StreamManagerAbstract): 84 def __init__(self) -> None: 85 self.io_memory_management: StreamManagerIOCoreMemoryManagement = StreamManagerIOCoreMemoryManagement() 86 self.autonomous_writer_stop_default_timeout: Optional[Union[int, float]] = 10.0 87 self.output_to_client_container_type = DynamicListOfPiecesDequeWithLengthControl 88 self.input_from_client_container_type = DynamicListOfPiecesDequeWithLengthControl 89 90 async def open_connection(self, host=None, port=None, *, 91 loop=None, limit=DEFAULT_LIMIT, 92 stream_type: StreamType = StreamType.general, stream_name: str = str(), 93 protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None, 94 max_message_size_len: Optional[int] = None, 95 **kwds) -> Tuple['TcpStreamReader', 'TcpStreamWriter']: 96 """A wrapper for create_connection() returning a (reader, writer) pair. 97 98 The reader returned is a TcpStreamReader instance; the writer is a 99 TcpStreamWriter instance. 100 101 The arguments are all the usual arguments to create_connection() 102 except protocol_factory; most common are positional host and port, 103 with various optional keyword arguments following. 104 105 Additional optional keyword arguments are loop (to set the event loop 106 instance to use) and limit (to set the buffer limit passed to the 107 TcpStreamReader). 108 109 (If you want to customize the TcpStreamReader and/or 110 TcpStreamReaderProtocol classes, just copy the code -- there's 111 really nothing special here except some convenience.) 112 """ 113 if StreamType.gate == stream_type: 114 raise ValueError(f'Wrong stream_type value: client can not be a "gate".') 115 116 if loop is None: 117 loop = events.get_event_loop() 118 else: 119 warnings.warn("The loop argument is deprecated since Python 3.8, " 120 "and scheduled for removal in Python 3.10.", 121 DeprecationWarning, stacklevel=2) 122 123 message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len) 124 reader = TcpStreamReader(self, message_protocol_settings, limit=limit, loop=loop) 125 protocol = TcpStreamReaderProtocol(self, message_protocol_settings, reader, loop=loop) 126 transport, _ = await loop.create_connection( 127 lambda: protocol, host, port, **kwds) 128 writer = TcpStreamWriter(transport, protocol, reader, loop) 129 return reader, writer 130 131 def bind_existing_connection(self, reader: OriginalStreamReader, writer: OriginalStreamWriter, *, loop=None, limit=DEFAULT_LIMIT, 132 stream_type: StreamType = StreamType.general, stream_name: str = str(), 133 protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None, 134 max_message_size_len: Optional[int] = None, 135 **kwds) -> Tuple['TcpStreamReader', 'TcpStreamWriter']: 136 reader = reinterpret_cast(TcpStreamReader, reader) 137 message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len) 138 reader._bind_to_stream_manager(self, message_protocol_settings, limit=limit, loop=loop) 139 140 protocol = reinterpret_cast(TcpStreamReaderProtocol, writer._protocol) 141 protocol._bind_to_stream_manager(message_protocol_settings, reader, loop=loop) 142 143 transport = writer._transport 144 if isinstance(transport, _SSLProtocolTransport): 145 transport = cast(_SSLProtocolTransport, transport) 146 ssl_protocol: SSLProtocol = transport._ssl_protocol 147 ssl_protocol._set_app_protocol(protocol) 148 elif isinstance(transport, _ProactorSocketTransport): 149 transport = cast(_ProactorSocketTransport, transport) 150 transport.set_protocol(protocol) 151 elif isinstance(transport, _SelectorSocketTransport): 152 transport = cast(_SelectorSocketTransport, transport) 153 transport.set_protocol(protocol) 154 else: 155 raise RuntimeError(f'Unsupported transport type: {type(transport)}') 156 157 writer = reinterpret_cast(TcpStreamWriter, writer) 158 writer._bind_to_stream_manager(self, transport, protocol, reader, loop) 159 160 return reader, writer 161 162 async def start_server(self, client_connected_cb, host=None, port=None, *, 163 loop=None, limit=DEFAULT_LIMIT, 164 stream_type: StreamType = StreamType.general, stream_name: str = str(), 165 gate_security_policy: GateSecurityPolicy = GateSecurityPolicy.disabled, policy_managed_stream_names: Optional[Set[str]] = None, 166 protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None, 167 max_message_size_len: Optional[int] = None, 168 **kwds): 169 """Start a socket server, call back for each client connected. 170 171 The first parameter, `client_connected_cb`, takes two parameters: 172 client_reader, client_writer. client_reader is a TcpStreamReader 173 object, while client_writer is a TcpStreamWriter object. This 174 parameter can either be a plain callback function or a coroutine; 175 if it is a coroutine, it will be automatically converted into a 176 Task. 177 178 The rest of the arguments are all the usual arguments to 179 loop.create_server() except protocol_factory; most common are 180 positional host and port, with various optional keyword arguments 181 following. The return value is the same as loop.create_server(). 182 183 Additional optional keyword arguments are loop (to set the event loop 184 instance to use) and limit (to set the buffer limit passed to the 185 TcpStreamReader). 186 187 The return value is the same as loop.create_server(), i.e. a 188 Server object which can be used to stop the service. 189 """ 190 if loop is None: 191 loop = events.get_event_loop() 192 else: 193 warnings.warn("The loop argument is deprecated since Python 3.8, " 194 "and scheduled for removal in Python 3.10.", 195 DeprecationWarning, stacklevel=2) 196 197 def factory(): 198 message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len) 199 reader = TcpStreamReader(self, message_protocol_settings, limit=limit, loop=loop) 200 protocol = TcpStreamReaderProtocol(self, message_protocol_settings, reader, client_connected_cb, 201 loop=loop) 202 return protocol 203 204 return await loop.create_server(factory, host, port, **kwds) 205 206 def bind_accepted_connection(self, 207 reader: OriginalStreamReader, writer: OriginalStreamWriter, *, loop=None, limit=DEFAULT_LIMIT, 208 stream_type: StreamType = StreamType.general, stream_name: str = str(), 209 protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None, 210 max_message_size_len: Optional[int] = None, 211 **kwds) -> Tuple['TcpStreamReader', 'TcpStreamWriter']: 212 reader = reinterpret_cast(TcpStreamReader, reader) 213 message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len) 214 reader._bind_to_stream_manager(self, message_protocol_settings, limit=limit, loop=loop) 215 216 protocol = reinterpret_cast(TcpStreamReaderProtocol, writer._protocol) 217 client_connected_cb = protocol._client_connected_cb 218 protocol._bind_to_stream_manager(message_protocol_settings, reader, client_connected_cb, loop=loop) 219 220 transport = writer._transport 221 if isinstance(transport, _SSLProtocolTransport): 222 transport = cast(_SSLProtocolTransport, transport) 223 ssl_protocol: SSLProtocol = transport._ssl_protocol 224 ssl_protocol._set_app_protocol(protocol) 225 elif isinstance(transport, _ProactorSocketTransport): 226 transport = cast(_ProactorSocketTransport, transport) 227 transport.set_protocol(protocol) 228 elif isinstance(transport, _SelectorSocketTransport): 229 transport = cast(_SelectorSocketTransport, transport) 230 transport.set_protocol(protocol) 231 else: 232 raise RuntimeError(f'Unsupported transport type: {type(transport)}') 233 234 writer = reinterpret_cast(TcpStreamWriter, writer) 235 writer._bind_to_stream_manager(self, transport, protocol, reader, loop) 236 237 return reader, writer 238 239 async def try_establish_message_protocol_server_side(self, reader: 'TcpStreamReader', writer: 'TcpStreamWriter') -> bool: 240 message_size_len_encoded = await reader.readonly_exactly(1) 241 message_size_len = int.from_bytes(message_size_len_encoded, 'little') 242 can_be_established: bool = False 243 if message_size_len <= reader._message_protocol_settings.max_message_size_len: 244 message_size_encoded = (await reader.readonly_exactly(1 + message_size_len))[1:] 245 message_size = int.from_bytes(message_size_encoded, 'little') 246 message = (await reader.readonly_exactly(1 + message_size_len + message_size))[1 + message_size_len:] 247 if message == reader._message_protocol_settings.protocol_greeting: 248 can_be_established = True 249 250 if can_be_established: 251 reader._message_protocol_settings.message_size_len = message_size_len 252 writer._protocol._message_protocol_settings.message_size_len = message_size_len 253 await reader.readexactly(1) 254 await reader.read_message() 255 message = reader._message_protocol_settings.message_size_len.to_bytes(1, 'little') + len(reader._message_protocol_settings.protocol_greeting).to_bytes(reader._message_protocol_settings.message_size_len, 'little') + reader._message_protocol_settings.protocol_greeting 256 writer.write(message) 257 await writer.full_drain() 258 return True 259 else: 260 return False 261 262 async def try_establish_message_protocol_client_side(self, reader: 'TcpStreamReader', writer: 'TcpStreamWriter') -> bool: 263 message = reader._message_protocol_settings.message_size_len.to_bytes(1, 'little') + len(reader._message_protocol_settings.protocol_greeting).to_bytes(reader._message_protocol_settings.message_size_len, 'little') + reader._message_protocol_settings.protocol_greeting 264 writer.write(message) 265 await writer.full_drain() 266 message_size_len_encoded = await reader.readonly_exactly(1) 267 message_size_len = int.from_bytes(message_size_len_encoded, 'little') 268 can_be_established: bool = False 269 if message_size_len <= reader._message_protocol_settings.max_message_size_len: 270 message_size_encoded = (await reader.readonly_exactly(1 + message_size_len))[1:] 271 message_size = int.from_bytes(message_size_encoded, 'little') 272 message = (await reader.readonly_exactly(1 + message_size_len + message_size))[1 + message_size_len:] 273 if message == reader._message_protocol_settings.protocol_greeting: 274 can_be_established = True 275 276 if can_be_established: 277 reader._message_protocol_settings.message_size_len = message_size_len 278 writer._protocol._message_protocol_settings.message_size_len = message_size_len 279 await reader.readexactly(1) 280 await reader.read_message() 281 return True 282 else: 283 return False
90 async def open_connection(self, host=None, port=None, *, 91 loop=None, limit=DEFAULT_LIMIT, 92 stream_type: StreamType = StreamType.general, stream_name: str = str(), 93 protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None, 94 max_message_size_len: Optional[int] = None, 95 **kwds) -> Tuple['TcpStreamReader', 'TcpStreamWriter']: 96 """A wrapper for create_connection() returning a (reader, writer) pair. 97 98 The reader returned is a TcpStreamReader instance; the writer is a 99 TcpStreamWriter instance. 100 101 The arguments are all the usual arguments to create_connection() 102 except protocol_factory; most common are positional host and port, 103 with various optional keyword arguments following. 104 105 Additional optional keyword arguments are loop (to set the event loop 106 instance to use) and limit (to set the buffer limit passed to the 107 TcpStreamReader). 108 109 (If you want to customize the TcpStreamReader and/or 110 TcpStreamReaderProtocol classes, just copy the code -- there's 111 really nothing special here except some convenience.) 112 """ 113 if StreamType.gate == stream_type: 114 raise ValueError(f'Wrong stream_type value: client can not be a "gate".') 115 116 if loop is None: 117 loop = events.get_event_loop() 118 else: 119 warnings.warn("The loop argument is deprecated since Python 3.8, " 120 "and scheduled for removal in Python 3.10.", 121 DeprecationWarning, stacklevel=2) 122 123 message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len) 124 reader = TcpStreamReader(self, message_protocol_settings, limit=limit, loop=loop) 125 protocol = TcpStreamReaderProtocol(self, message_protocol_settings, reader, loop=loop) 126 transport, _ = await loop.create_connection( 127 lambda: protocol, host, port, **kwds) 128 writer = TcpStreamWriter(transport, protocol, reader, loop) 129 return reader, writer
A wrapper for create_connection() returning a (reader, writer) pair.
The reader returned is a TcpStreamReader instance; the writer is a TcpStreamWriter instance.
The arguments are all the usual arguments to create_connection() except protocol_factory; most common are positional host and port, with various optional keyword arguments following.
Additional optional keyword arguments are loop (to set the event loop instance to use) and limit (to set the buffer limit passed to the TcpStreamReader).
(If you want to customize the TcpStreamReader and/or TcpStreamReaderProtocol classes, just copy the code -- there's really nothing special here except some convenience.)
131 def bind_existing_connection(self, reader: OriginalStreamReader, writer: OriginalStreamWriter, *, loop=None, limit=DEFAULT_LIMIT, 132 stream_type: StreamType = StreamType.general, stream_name: str = str(), 133 protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None, 134 max_message_size_len: Optional[int] = None, 135 **kwds) -> Tuple['TcpStreamReader', 'TcpStreamWriter']: 136 reader = reinterpret_cast(TcpStreamReader, reader) 137 message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len) 138 reader._bind_to_stream_manager(self, message_protocol_settings, limit=limit, loop=loop) 139 140 protocol = reinterpret_cast(TcpStreamReaderProtocol, writer._protocol) 141 protocol._bind_to_stream_manager(message_protocol_settings, reader, loop=loop) 142 143 transport = writer._transport 144 if isinstance(transport, _SSLProtocolTransport): 145 transport = cast(_SSLProtocolTransport, transport) 146 ssl_protocol: SSLProtocol = transport._ssl_protocol 147 ssl_protocol._set_app_protocol(protocol) 148 elif isinstance(transport, _ProactorSocketTransport): 149 transport = cast(_ProactorSocketTransport, transport) 150 transport.set_protocol(protocol) 151 elif isinstance(transport, _SelectorSocketTransport): 152 transport = cast(_SelectorSocketTransport, transport) 153 transport.set_protocol(protocol) 154 else: 155 raise RuntimeError(f'Unsupported transport type: {type(transport)}') 156 157 writer = reinterpret_cast(TcpStreamWriter, writer) 158 writer._bind_to_stream_manager(self, transport, protocol, reader, loop) 159 160 return reader, writer
162 async def start_server(self, client_connected_cb, host=None, port=None, *, 163 loop=None, limit=DEFAULT_LIMIT, 164 stream_type: StreamType = StreamType.general, stream_name: str = str(), 165 gate_security_policy: GateSecurityPolicy = GateSecurityPolicy.disabled, policy_managed_stream_names: Optional[Set[str]] = None, 166 protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None, 167 max_message_size_len: Optional[int] = None, 168 **kwds): 169 """Start a socket server, call back for each client connected. 170 171 The first parameter, `client_connected_cb`, takes two parameters: 172 client_reader, client_writer. client_reader is a TcpStreamReader 173 object, while client_writer is a TcpStreamWriter object. This 174 parameter can either be a plain callback function or a coroutine; 175 if it is a coroutine, it will be automatically converted into a 176 Task. 177 178 The rest of the arguments are all the usual arguments to 179 loop.create_server() except protocol_factory; most common are 180 positional host and port, with various optional keyword arguments 181 following. The return value is the same as loop.create_server(). 182 183 Additional optional keyword arguments are loop (to set the event loop 184 instance to use) and limit (to set the buffer limit passed to the 185 TcpStreamReader). 186 187 The return value is the same as loop.create_server(), i.e. a 188 Server object which can be used to stop the service. 189 """ 190 if loop is None: 191 loop = events.get_event_loop() 192 else: 193 warnings.warn("The loop argument is deprecated since Python 3.8, " 194 "and scheduled for removal in Python 3.10.", 195 DeprecationWarning, stacklevel=2) 196 197 def factory(): 198 message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len) 199 reader = TcpStreamReader(self, message_protocol_settings, limit=limit, loop=loop) 200 protocol = TcpStreamReaderProtocol(self, message_protocol_settings, reader, client_connected_cb, 201 loop=loop) 202 return protocol 203 204 return await loop.create_server(factory, host, port, **kwds)
Start a socket server, call back for each client connected.
The first parameter, client_connected_cb
, takes two parameters:
client_reader, client_writer. client_reader is a TcpStreamReader
object, while client_writer is a TcpStreamWriter object. This
parameter can either be a plain callback function or a coroutine;
if it is a coroutine, it will be automatically converted into a
Task.
The rest of the arguments are all the usual arguments to loop.create_server() except protocol_factory; most common are positional host and port, with various optional keyword arguments following. The return value is the same as loop.create_server().
Additional optional keyword arguments are loop (to set the event loop instance to use) and limit (to set the buffer limit passed to the TcpStreamReader).
The return value is the same as loop.create_server(), i.e. a Server object which can be used to stop the service.
206 def bind_accepted_connection(self, 207 reader: OriginalStreamReader, writer: OriginalStreamWriter, *, loop=None, limit=DEFAULT_LIMIT, 208 stream_type: StreamType = StreamType.general, stream_name: str = str(), 209 protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None, 210 max_message_size_len: Optional[int] = None, 211 **kwds) -> Tuple['TcpStreamReader', 'TcpStreamWriter']: 212 reader = reinterpret_cast(TcpStreamReader, reader) 213 message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len) 214 reader._bind_to_stream_manager(self, message_protocol_settings, limit=limit, loop=loop) 215 216 protocol = reinterpret_cast(TcpStreamReaderProtocol, writer._protocol) 217 client_connected_cb = protocol._client_connected_cb 218 protocol._bind_to_stream_manager(message_protocol_settings, reader, client_connected_cb, loop=loop) 219 220 transport = writer._transport 221 if isinstance(transport, _SSLProtocolTransport): 222 transport = cast(_SSLProtocolTransport, transport) 223 ssl_protocol: SSLProtocol = transport._ssl_protocol 224 ssl_protocol._set_app_protocol(protocol) 225 elif isinstance(transport, _ProactorSocketTransport): 226 transport = cast(_ProactorSocketTransport, transport) 227 transport.set_protocol(protocol) 228 elif isinstance(transport, _SelectorSocketTransport): 229 transport = cast(_SelectorSocketTransport, transport) 230 transport.set_protocol(protocol) 231 else: 232 raise RuntimeError(f'Unsupported transport type: {type(transport)}') 233 234 writer = reinterpret_cast(TcpStreamWriter, writer) 235 writer._bind_to_stream_manager(self, transport, protocol, reader, loop) 236 237 return reader, writer
239 async def try_establish_message_protocol_server_side(self, reader: 'TcpStreamReader', writer: 'TcpStreamWriter') -> bool: 240 message_size_len_encoded = await reader.readonly_exactly(1) 241 message_size_len = int.from_bytes(message_size_len_encoded, 'little') 242 can_be_established: bool = False 243 if message_size_len <= reader._message_protocol_settings.max_message_size_len: 244 message_size_encoded = (await reader.readonly_exactly(1 + message_size_len))[1:] 245 message_size = int.from_bytes(message_size_encoded, 'little') 246 message = (await reader.readonly_exactly(1 + message_size_len + message_size))[1 + message_size_len:] 247 if message == reader._message_protocol_settings.protocol_greeting: 248 can_be_established = True 249 250 if can_be_established: 251 reader._message_protocol_settings.message_size_len = message_size_len 252 writer._protocol._message_protocol_settings.message_size_len = message_size_len 253 await reader.readexactly(1) 254 await reader.read_message() 255 message = reader._message_protocol_settings.message_size_len.to_bytes(1, 'little') + len(reader._message_protocol_settings.protocol_greeting).to_bytes(reader._message_protocol_settings.message_size_len, 'little') + reader._message_protocol_settings.protocol_greeting 256 writer.write(message) 257 await writer.full_drain() 258 return True 259 else: 260 return False
262 async def try_establish_message_protocol_client_side(self, reader: 'TcpStreamReader', writer: 'TcpStreamWriter') -> bool: 263 message = reader._message_protocol_settings.message_size_len.to_bytes(1, 'little') + len(reader._message_protocol_settings.protocol_greeting).to_bytes(reader._message_protocol_settings.message_size_len, 'little') + reader._message_protocol_settings.protocol_greeting 264 writer.write(message) 265 await writer.full_drain() 266 message_size_len_encoded = await reader.readonly_exactly(1) 267 message_size_len = int.from_bytes(message_size_len_encoded, 'little') 268 can_be_established: bool = False 269 if message_size_len <= reader._message_protocol_settings.max_message_size_len: 270 message_size_encoded = (await reader.readonly_exactly(1 + message_size_len))[1:] 271 message_size = int.from_bytes(message_size_encoded, 'little') 272 message = (await reader.readonly_exactly(1 + message_size_len + message_size))[1 + message_size_len:] 273 if message == reader._message_protocol_settings.protocol_greeting: 274 can_be_established = True 275 276 if can_be_established: 277 reader._message_protocol_settings.message_size_len = message_size_len 278 writer._protocol._message_protocol_settings.message_size_len = message_size_len 279 await reader.readexactly(1) 280 await reader.read_message() 281 return True 282 else: 283 return False
377class TcpStreamReader(OriginalStreamReader, StreamReaderAbstract): 378 def __init__(self, manager: TcpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None: 379 self._bind_to_stream_manager(manager, message_protocol_settings, *args, **kwargs) 380 381 def _bind_to_stream_manager(self, manager: TcpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None: 382 super().__init__(*args, **kwargs) 383 self._stream_manager = manager 384 self._message_protocol_settings: MessageProtocolSettings = message_protocol_settings 385 self._smart_buffer: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.input_from_client_container_type( 386 external_data_length=self._stream_manager.io_memory_management.global_in__data_full_size) 387 self.recv_buff_size_computer = RecvBuffSizeComputer() 388 cpu_info_inst = cpu_info() 389 # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size 390 # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l2_cache_size_per_virtual_core 391 # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size_per_virtual_core 392 # self.recv_buff_size_computer.max_recv_buff_size = 3145728 393 self.recv_buff_size_computer.max_recv_buff_size = 10 * 1024**2 394 # self.recv_buff_size_computer.max_recv_buff_size = 1024 395 # print(f'max_recv_buff_size: {self.recv_buff_size_computer.max_recv_buff_size}') 396 self.limit_by_limit: bool = True 397 self.limit_by_global_in__data_size_limit: bool = True 398 399 async def read_max(self): 400 return await self.read(self._limit) 401 402 async def read_nearly_max(self): 403 return await self.read_nearly(self._limit) 404 405 async def read_with_counter(self): 406 if self._exception is not None: 407 raise self._exception 408 409 # This used to just loop creating a new waiter hoping to 410 # collect everything in self._buffer, but that would 411 # deadlock if the subprocess sends more than self.limit 412 # bytes. So just call self.read(self._limit) until EOF. 413 blocks = [] 414 counter = 0 415 while True: 416 block = await self.read_max() 417 counter += 1 418 if not block: 419 break 420 blocks.append(block) 421 return b''.join(blocks), counter 422 423 def __repr__(self): 424 info = ['TcpStreamReader'] 425 if self._smart_buffer.size(): 426 info.append(f'{self._smart_buffer.size()} bytes') 427 if self._eof: 428 info.append('eof') 429 if self._limit != DEFAULT_LIMIT: 430 info.append(f'limit={self._limit}') 431 if self._waiter: 432 info.append(f'waiter={self._waiter!r}') 433 if self._exception: 434 info.append(f'exception={self._exception!r}') 435 if self._transport: 436 info.append(f'transport={self._transport!r}') 437 if self._paused: 438 info.append('paused') 439 return '<{}>'.format(' '.join(info)) 440 441 def _maybe_resume_transport(self): 442 if isinstance(self._transport, ( 443 proactor_events._ProactorDatagramTransport, 444 selector_events._SelectorTransport, 445 unix_events._UnixReadPipeTransport 446 )): 447 # if hasattr(self._transport, 'max_size'): 448 try: 449 self._transport.max_size = self.recv_buff_size_computer.recv_buff_size 450 # print(f'max_size: {self._transport.max_size}') 451 except AttributeError: 452 pass 453 else: 454 print(f'Unsupported transport: {type(self._transport)}') 455 456 if self._paused \ 457 and ( 458 ((not self.limit_by_limit) and (not self.limit_by_global_in__data_size_limit)) \ 459 or (self.limit_by_limit and (not self._limit)) \ 460 or (self.limit_by_limit and (self._smart_buffer.size() <= self._limit)) \ 461 or (self.limit_by_global_in__data_size_limit and (not self._stream_manager.io_memory_management.global_in__data_size_limit)) \ 462 or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value <= self._stream_manager.io_memory_management.global_in__data_size_limit.value)) 463 ): 464 self._paused = False 465 self._transport.resume_reading() 466 467 def at_eof(self): 468 """Return True if the buffer is empty and 'feed_eof' was called.""" 469 return self._eof and not self._smart_buffer.size() 470 471 def feed_data(self, data): 472 assert not self._eof, 'feed_data after feed_eof' 473 474 if not data: 475 return 476 477 data_len = len(data) 478 self.recv_buff_size_computer.calc_new_recv_buff_size(data_len) 479 self._smart_buffer.add_piece_of_data(data) 480 self._wakeup_waiter() 481 482 if (self._transport is not None and 483 not self._paused 484 and ( 485 (self.limit_by_limit and (self._smart_buffer.size() > 2 * self._limit) 486 or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value > self._stream_manager.io_memory_management.global_in__data_size_limit.value))) 487 )): 488 try: 489 self._transport.pause_reading() 490 except NotImplementedError: 491 # The transport can't be paused. 492 # We'll just have to buffer all data. 493 # Forget the transport so we don't keep trying. 494 self._transport = None 495 else: 496 self._paused = True 497 498 async def readline(self): 499 """Read chunk of data from the stream until newline (b'\n') is found. 500 501 On success, return chunk that ends with newline. If only partial 502 line can be read due to EOF, return incomplete line without 503 terminating newline. When EOF was reached while no bytes read, empty 504 bytes object is returned. 505 506 If limit is reached, ValueError will be raised. In that case, if 507 newline was found, complete line including newline will be removed 508 from internal buffer. Else, internal buffer will be cleared. Limit is 509 compared against part of the line without newline. 510 511 If stream was paused, this function will automatically resume it if 512 needed. 513 """ 514 sep = b'\n' 515 seplen = len(sep) 516 try: 517 line = await self.readuntil(sep) 518 except IncompleteReadError as e: 519 return e.partial 520 except LimitOverrunError as e: 521 if self._smart_buffer.startswith(sep, e.consumed): 522 self._smart_buffer.get_data(e.consumed + seplen) 523 else: 524 self._smart_buffer.clear() 525 526 self._maybe_resume_transport() 527 raise ValueError(e.args[0]) 528 return line 529 530 async def readuntil(self, separator=b'\n'): 531 """Read data from the stream until ``separator`` is found. 532 533 On success, the data and separator will be removed from the 534 internal buffer (consumed). Returned data will include the 535 separator at the end. 536 537 Configured stream limit is used to check result. Limit sets the 538 maximal length of data that can be returned, not counting the 539 separator. 540 541 If an EOF occurs and the complete separator is still not found, 542 an IncompleteReadError exception will be raised, and the internal 543 buffer will be reset. The IncompleteReadError.partial attribute 544 may contain the separator partially. 545 546 If the data cannot be read because of over limit, a 547 LimitOverrunError exception will be raised, and the data 548 will be left in the internal buffer, so it can be read again. 549 """ 550 seplen = len(separator) 551 if seplen == 0: 552 raise ValueError('Separator should be at least one-byte string') 553 554 if self._exception is not None: 555 raise self._exception 556 557 # Consume whole buffer except last bytes, which length is 558 # one less than seplen. Let's check corner cases with 559 # separator='SEPARATOR': 560 # * we have received almost complete separator (without last 561 # byte). i.e buffer='some textSEPARATO'. In this case we 562 # can safely consume len(separator) - 1 bytes. 563 # * last byte of buffer is first byte of separator, i.e. 564 # buffer='abcdefghijklmnopqrS'. We may safely consume 565 # everything except that last byte, but this require to 566 # analyze bytes of buffer that match partial separator. 567 # This is slow and/or require FSM. For this case our 568 # implementation is not optimal, since require rescanning 569 # of data that is known to not belong to separator. In 570 # real world, separator will not be so long to notice 571 # performance problems. Even when reading MIME-encoded 572 # messages :) 573 574 # `offset` is the number of bytes from the beginning of the buffer 575 # where there is no occurrence of `separator`. 576 offset = 0 577 578 # Loop until we find `separator` in the buffer, exceed the buffer size, 579 # or an EOF has happened. 580 while True: 581 buflen = self._smart_buffer.size() 582 583 # Check if we now have enough data in the buffer for `separator` to 584 # fit. 585 if buflen - offset >= seplen: 586 isep = self._smart_buffer.find(separator, offset) 587 588 if isep != -1: 589 # `separator` is in the buffer. `isep` will be used later 590 # to retrieve the data. 591 break 592 593 # see upper comment for explanation. 594 offset = buflen + 1 - seplen 595 if offset > self._limit: 596 raise LimitOverrunError( 597 'Separator is not found, and chunk exceed the limit', 598 offset) 599 600 # Complete message (with full separator) may be present in buffer 601 # even when EOF flag is set. This may happen when the last chunk 602 # adds data which makes separator be found. That's why we check for 603 # EOF *ater* inspecting the buffer. 604 if self._eof: 605 chunk = self._smart_buffer.get_data(self._smart_buffer.size()) 606 raise IncompleteReadError(chunk, None) 607 608 # _wait_for_data() will resume reading if stream was paused. 609 await self._wait_for_data('readuntil') 610 611 if isep > self._limit: 612 raise LimitOverrunError( 613 'Separator is found, but chunk is longer than limit', isep) 614 615 chunk = self._smart_buffer.get_data(isep + seplen) 616 self._maybe_resume_transport() 617 return bytes(chunk) 618 619 async def read(self, n=-1): 620 """Read up to `n` bytes from the stream. 621 622 If n is not provided, or set to -1, read until EOF and return all read 623 bytes. If the EOF was received and the internal buffer is empty, return 624 an empty bytes object. 625 626 If n is zero, return empty bytes object immediately. 627 628 If n is positive, this function try to read `n` bytes, and may return 629 less or equal bytes than requested, but at least one byte. If EOF was 630 received before any byte is read, this function returns empty byte 631 object. 632 633 Returned value is not limited with limit, configured at stream 634 creation. 635 636 If stream was paused, this function will automatically resume it if 637 needed. 638 """ 639 640 if self._exception is not None: 641 raise self._exception 642 643 if n == 0: 644 return b'' 645 646 if n < 0: 647 # This used to just loop creating a new waiter hoping to 648 # collect everything in self._buffer, but that would 649 # deadlock if the subprocess sends more than self.limit 650 # bytes. So just call self.read(self._limit) until EOF. 651 blocks = [] 652 while True: 653 block = await self.read_nearly(max(self._limit, self._smart_buffer.size())) 654 if not block: 655 break 656 blocks.append(block) 657 return b''.join(blocks) 658 659 if not self._smart_buffer.size() and not self._eof: 660 await self._wait_for_data('read') 661 662 # This will work right even if buffer is less than n bytes 663 data = self._smart_buffer.get_data(min(n, self._smart_buffer.size())) 664 665 self._maybe_resume_transport() 666 return data 667 668 async def read_nearly(self, n=-1): 669 """Read up to `n` bytes from the stream. 670 671 If n is not provided, or set to -1, read until EOF and return all read 672 bytes. If the EOF was received and the internal buffer is empty, return 673 an empty bytes object. 674 675 If n is zero, return empty bytes object immediately. 676 677 If n is positive, this function try to read `n` bytes, and may return 678 less or equal bytes than requested, but at least one byte. If EOF was 679 received before any byte is read, this function returns empty byte 680 object. 681 682 Returned value is not limited with limit, configured at stream 683 creation. 684 685 If stream was paused, this function will automatically resume it if 686 needed. 687 """ 688 689 if self._exception is not None: 690 raise self._exception 691 692 if n == 0: 693 return b'' 694 695 if n < 0: 696 # This used to just loop creating a new waiter hoping to 697 # collect everything in self._buffer, but that would 698 # deadlock if the subprocess sends more than self.limit 699 # bytes. So just call self.read(self._limit) until EOF. 700 blocks = [] 701 while True: 702 block = await self.read_nearly(max(self._limit, self._smart_buffer.size())) 703 if not block: 704 break 705 blocks.append(block) 706 return b''.join(blocks) 707 708 if not self._smart_buffer.size() and not self._eof: 709 await self._wait_for_data('read') 710 711 # This will work right even if buffer is less than n bytes 712 data = self._smart_buffer.get_data_nearly(n) 713 714 self._maybe_resume_transport() 715 return data 716 717 async def readexactly(self, n): 718 """Read exactly `n` bytes. 719 720 Raise an IncompleteReadError if EOF is reached before `n` bytes can be 721 read. The IncompleteReadError.partial attribute of the exception will 722 contain the partial read bytes. 723 724 if n is zero, return empty bytes object. 725 726 Returned value is not limited with limit, configured at stream 727 creation. 728 729 If stream was paused, this function will automatically resume it if 730 needed. 731 """ 732 if n < 0: 733 raise ValueError('readexactly size can not be less than zero') 734 735 if self._exception is not None: 736 raise self._exception 737 738 if n == 0: 739 return b'' 740 741 while self._smart_buffer.size() < n: 742 if self._eof: 743 incomplete = self._smart_buffer.get_data(self._smart_buffer.size()) 744 raise IncompleteReadError(incomplete, n) 745 746 await self._wait_for_data('readexactly') 747 748 if self._smart_buffer.size() == n: 749 data = self._smart_buffer.get_data(self._smart_buffer.size()) 750 else: 751 data = self._smart_buffer.get_data(n) 752 753 self._maybe_resume_transport() 754 return data 755 756 async def readonly_exactly(self, n): 757 if n < 0: 758 raise ValueError('readexactly size can not be less than zero') 759 760 if self._exception is not None: 761 raise self._exception 762 763 if n == 0: 764 return b'' 765 766 while self._smart_buffer.size() < n: 767 if self._eof: 768 incomplete = self._smart_buffer.read_data(self._smart_buffer.size()) 769 raise IncompleteReadError(incomplete, n) 770 771 await self._wait_for_data('readexactly') 772 773 if self._smart_buffer.size() == n: 774 data = self._smart_buffer.read_data(self._smart_buffer.size()) 775 else: 776 data = self._smart_buffer.read_data(n) 777 778 self._maybe_resume_transport() 779 return data 780 781 async def read_message(self): 782 message_len_encoded = await self.readexactly(self._message_protocol_settings.message_size_len) 783 message_len = int.from_bytes(message_len_encoded, 'little') 784 return await self.readexactly(message_len) 785 786 def message_awailable(self) -> bool: 787 message_size_len = self._message_protocol_settings.message_size_len 788 if self._smart_buffer.size() < message_size_len: 789 return False 790 791 message_len_encoded = self._smart_buffer.get_data(message_size_len) 792 message_len = int.from_bytes(message_len_encoded, 'little') 793 if self._smart_buffer.size() < (message_size_len + message_len): 794 return False 795 796 return True 797 798 def transport_pause_reading(self): 799 try: 800 self._transport.pause_reading() 801 except NotImplementedError: 802 # The transport can't be paused. 803 # We'll just have to buffer all data. 804 # Forget the transport so we don't keep trying. 805 pass 806 else: 807 self._paused = True 808 809 def transport_resume_reading(self): 810 self._paused = False 811 self._transport.resume_reading()
405 async def read_with_counter(self): 406 if self._exception is not None: 407 raise self._exception 408 409 # This used to just loop creating a new waiter hoping to 410 # collect everything in self._buffer, but that would 411 # deadlock if the subprocess sends more than self.limit 412 # bytes. So just call self.read(self._limit) until EOF. 413 blocks = [] 414 counter = 0 415 while True: 416 block = await self.read_max() 417 counter += 1 418 if not block: 419 break 420 blocks.append(block) 421 return b''.join(blocks), counter
467 def at_eof(self): 468 """Return True if the buffer is empty and 'feed_eof' was called.""" 469 return self._eof and not self._smart_buffer.size()
Return True if the buffer is empty and 'feed_eof' was called.
471 def feed_data(self, data): 472 assert not self._eof, 'feed_data after feed_eof' 473 474 if not data: 475 return 476 477 data_len = len(data) 478 self.recv_buff_size_computer.calc_new_recv_buff_size(data_len) 479 self._smart_buffer.add_piece_of_data(data) 480 self._wakeup_waiter() 481 482 if (self._transport is not None and 483 not self._paused 484 and ( 485 (self.limit_by_limit and (self._smart_buffer.size() > 2 * self._limit) 486 or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value > self._stream_manager.io_memory_management.global_in__data_size_limit.value))) 487 )): 488 try: 489 self._transport.pause_reading() 490 except NotImplementedError: 491 # The transport can't be paused. 492 # We'll just have to buffer all data. 493 # Forget the transport so we don't keep trying. 494 self._transport = None 495 else: 496 self._paused = True
498 async def readline(self): 499 """Read chunk of data from the stream until newline (b'\n') is found. 500 501 On success, return chunk that ends with newline. If only partial 502 line can be read due to EOF, return incomplete line without 503 terminating newline. When EOF was reached while no bytes read, empty 504 bytes object is returned. 505 506 If limit is reached, ValueError will be raised. In that case, if 507 newline was found, complete line including newline will be removed 508 from internal buffer. Else, internal buffer will be cleared. Limit is 509 compared against part of the line without newline. 510 511 If stream was paused, this function will automatically resume it if 512 needed. 513 """ 514 sep = b'\n' 515 seplen = len(sep) 516 try: 517 line = await self.readuntil(sep) 518 except IncompleteReadError as e: 519 return e.partial 520 except LimitOverrunError as e: 521 if self._smart_buffer.startswith(sep, e.consumed): 522 self._smart_buffer.get_data(e.consumed + seplen) 523 else: 524 self._smart_buffer.clear() 525 526 self._maybe_resume_transport() 527 raise ValueError(e.args[0]) 528 return line
Read chunk of data from the stream until newline (b' ') is found.
On success, return chunk that ends with newline. If only partial
line can be read due to EOF, return incomplete line without
terminating newline. When EOF was reached while no bytes read, empty
bytes object is returned.
If limit is reached, ValueError will be raised. In that case, if
newline was found, complete line including newline will be removed
from internal buffer. Else, internal buffer will be cleared. Limit is
compared against part of the line without newline.
If stream was paused, this function will automatically resume it if
needed.
530 async def readuntil(self, separator=b'\n'): 531 """Read data from the stream until ``separator`` is found. 532 533 On success, the data and separator will be removed from the 534 internal buffer (consumed). Returned data will include the 535 separator at the end. 536 537 Configured stream limit is used to check result. Limit sets the 538 maximal length of data that can be returned, not counting the 539 separator. 540 541 If an EOF occurs and the complete separator is still not found, 542 an IncompleteReadError exception will be raised, and the internal 543 buffer will be reset. The IncompleteReadError.partial attribute 544 may contain the separator partially. 545 546 If the data cannot be read because of over limit, a 547 LimitOverrunError exception will be raised, and the data 548 will be left in the internal buffer, so it can be read again. 549 """ 550 seplen = len(separator) 551 if seplen == 0: 552 raise ValueError('Separator should be at least one-byte string') 553 554 if self._exception is not None: 555 raise self._exception 556 557 # Consume whole buffer except last bytes, which length is 558 # one less than seplen. Let's check corner cases with 559 # separator='SEPARATOR': 560 # * we have received almost complete separator (without last 561 # byte). i.e buffer='some textSEPARATO'. In this case we 562 # can safely consume len(separator) - 1 bytes. 563 # * last byte of buffer is first byte of separator, i.e. 564 # buffer='abcdefghijklmnopqrS'. We may safely consume 565 # everything except that last byte, but this require to 566 # analyze bytes of buffer that match partial separator. 567 # This is slow and/or require FSM. For this case our 568 # implementation is not optimal, since require rescanning 569 # of data that is known to not belong to separator. In 570 # real world, separator will not be so long to notice 571 # performance problems. Even when reading MIME-encoded 572 # messages :) 573 574 # `offset` is the number of bytes from the beginning of the buffer 575 # where there is no occurrence of `separator`. 576 offset = 0 577 578 # Loop until we find `separator` in the buffer, exceed the buffer size, 579 # or an EOF has happened. 580 while True: 581 buflen = self._smart_buffer.size() 582 583 # Check if we now have enough data in the buffer for `separator` to 584 # fit. 585 if buflen - offset >= seplen: 586 isep = self._smart_buffer.find(separator, offset) 587 588 if isep != -1: 589 # `separator` is in the buffer. `isep` will be used later 590 # to retrieve the data. 591 break 592 593 # see upper comment for explanation. 594 offset = buflen + 1 - seplen 595 if offset > self._limit: 596 raise LimitOverrunError( 597 'Separator is not found, and chunk exceed the limit', 598 offset) 599 600 # Complete message (with full separator) may be present in buffer 601 # even when EOF flag is set. This may happen when the last chunk 602 # adds data which makes separator be found. That's why we check for 603 # EOF *ater* inspecting the buffer. 604 if self._eof: 605 chunk = self._smart_buffer.get_data(self._smart_buffer.size()) 606 raise IncompleteReadError(chunk, None) 607 608 # _wait_for_data() will resume reading if stream was paused. 609 await self._wait_for_data('readuntil') 610 611 if isep > self._limit: 612 raise LimitOverrunError( 613 'Separator is found, but chunk is longer than limit', isep) 614 615 chunk = self._smart_buffer.get_data(isep + seplen) 616 self._maybe_resume_transport() 617 return bytes(chunk)
Read data from the stream until separator
is found.
On success, the data and separator will be removed from the internal buffer (consumed). Returned data will include the separator at the end.
Configured stream limit is used to check result. Limit sets the maximal length of data that can be returned, not counting the separator.
If an EOF occurs and the complete separator is still not found, an IncompleteReadError exception will be raised, and the internal buffer will be reset. The IncompleteReadError.partial attribute may contain the separator partially.
If the data cannot be read because of over limit, a LimitOverrunError exception will be raised, and the data will be left in the internal buffer, so it can be read again.
619 async def read(self, n=-1): 620 """Read up to `n` bytes from the stream. 621 622 If n is not provided, or set to -1, read until EOF and return all read 623 bytes. If the EOF was received and the internal buffer is empty, return 624 an empty bytes object. 625 626 If n is zero, return empty bytes object immediately. 627 628 If n is positive, this function try to read `n` bytes, and may return 629 less or equal bytes than requested, but at least one byte. If EOF was 630 received before any byte is read, this function returns empty byte 631 object. 632 633 Returned value is not limited with limit, configured at stream 634 creation. 635 636 If stream was paused, this function will automatically resume it if 637 needed. 638 """ 639 640 if self._exception is not None: 641 raise self._exception 642 643 if n == 0: 644 return b'' 645 646 if n < 0: 647 # This used to just loop creating a new waiter hoping to 648 # collect everything in self._buffer, but that would 649 # deadlock if the subprocess sends more than self.limit 650 # bytes. So just call self.read(self._limit) until EOF. 651 blocks = [] 652 while True: 653 block = await self.read_nearly(max(self._limit, self._smart_buffer.size())) 654 if not block: 655 break 656 blocks.append(block) 657 return b''.join(blocks) 658 659 if not self._smart_buffer.size() and not self._eof: 660 await self._wait_for_data('read') 661 662 # This will work right even if buffer is less than n bytes 663 data = self._smart_buffer.get_data(min(n, self._smart_buffer.size())) 664 665 self._maybe_resume_transport() 666 return data
Read up to n
bytes from the stream.
If n is not provided, or set to -1, read until EOF and return all read bytes. If the EOF was received and the internal buffer is empty, return an empty bytes object.
If n is zero, return empty bytes object immediately.
If n is positive, this function try to read n
bytes, and may return
less or equal bytes than requested, but at least one byte. If EOF was
received before any byte is read, this function returns empty byte
object.
Returned value is not limited with limit, configured at stream creation.
If stream was paused, this function will automatically resume it if needed.
668 async def read_nearly(self, n=-1): 669 """Read up to `n` bytes from the stream. 670 671 If n is not provided, or set to -1, read until EOF and return all read 672 bytes. If the EOF was received and the internal buffer is empty, return 673 an empty bytes object. 674 675 If n is zero, return empty bytes object immediately. 676 677 If n is positive, this function try to read `n` bytes, and may return 678 less or equal bytes than requested, but at least one byte. If EOF was 679 received before any byte is read, this function returns empty byte 680 object. 681 682 Returned value is not limited with limit, configured at stream 683 creation. 684 685 If stream was paused, this function will automatically resume it if 686 needed. 687 """ 688 689 if self._exception is not None: 690 raise self._exception 691 692 if n == 0: 693 return b'' 694 695 if n < 0: 696 # This used to just loop creating a new waiter hoping to 697 # collect everything in self._buffer, but that would 698 # deadlock if the subprocess sends more than self.limit 699 # bytes. So just call self.read(self._limit) until EOF. 700 blocks = [] 701 while True: 702 block = await self.read_nearly(max(self._limit, self._smart_buffer.size())) 703 if not block: 704 break 705 blocks.append(block) 706 return b''.join(blocks) 707 708 if not self._smart_buffer.size() and not self._eof: 709 await self._wait_for_data('read') 710 711 # This will work right even if buffer is less than n bytes 712 data = self._smart_buffer.get_data_nearly(n) 713 714 self._maybe_resume_transport() 715 return data
Read up to n
bytes from the stream.
If n is not provided, or set to -1, read until EOF and return all read bytes. If the EOF was received and the internal buffer is empty, return an empty bytes object.
If n is zero, return empty bytes object immediately.
If n is positive, this function try to read n
bytes, and may return
less or equal bytes than requested, but at least one byte. If EOF was
received before any byte is read, this function returns empty byte
object.
Returned value is not limited with limit, configured at stream creation.
If stream was paused, this function will automatically resume it if needed.
717 async def readexactly(self, n): 718 """Read exactly `n` bytes. 719 720 Raise an IncompleteReadError if EOF is reached before `n` bytes can be 721 read. The IncompleteReadError.partial attribute of the exception will 722 contain the partial read bytes. 723 724 if n is zero, return empty bytes object. 725 726 Returned value is not limited with limit, configured at stream 727 creation. 728 729 If stream was paused, this function will automatically resume it if 730 needed. 731 """ 732 if n < 0: 733 raise ValueError('readexactly size can not be less than zero') 734 735 if self._exception is not None: 736 raise self._exception 737 738 if n == 0: 739 return b'' 740 741 while self._smart_buffer.size() < n: 742 if self._eof: 743 incomplete = self._smart_buffer.get_data(self._smart_buffer.size()) 744 raise IncompleteReadError(incomplete, n) 745 746 await self._wait_for_data('readexactly') 747 748 if self._smart_buffer.size() == n: 749 data = self._smart_buffer.get_data(self._smart_buffer.size()) 750 else: 751 data = self._smart_buffer.get_data(n) 752 753 self._maybe_resume_transport() 754 return data
Read exactly n
bytes.
Raise an IncompleteReadError if EOF is reached before n
bytes can be
read. The IncompleteReadError.partial attribute of the exception will
contain the partial read bytes.
if n is zero, return empty bytes object.
Returned value is not limited with limit, configured at stream creation.
If stream was paused, this function will automatically resume it if needed.
756 async def readonly_exactly(self, n): 757 if n < 0: 758 raise ValueError('readexactly size can not be less than zero') 759 760 if self._exception is not None: 761 raise self._exception 762 763 if n == 0: 764 return b'' 765 766 while self._smart_buffer.size() < n: 767 if self._eof: 768 incomplete = self._smart_buffer.read_data(self._smart_buffer.size()) 769 raise IncompleteReadError(incomplete, n) 770 771 await self._wait_for_data('readexactly') 772 773 if self._smart_buffer.size() == n: 774 data = self._smart_buffer.read_data(self._smart_buffer.size()) 775 else: 776 data = self._smart_buffer.read_data(n) 777 778 self._maybe_resume_transport() 779 return data
786 def message_awailable(self) -> bool: 787 message_size_len = self._message_protocol_settings.message_size_len 788 if self._smart_buffer.size() < message_size_len: 789 return False 790 791 message_len_encoded = self._smart_buffer.get_data(message_size_len) 792 message_len = int.from_bytes(message_len_encoded, 'little') 793 if self._smart_buffer.size() < (message_size_len + message_len): 794 return False 795 796 return True
Inherited Members
- asyncio.streams.StreamReader
- exception
- set_exception
- set_transport
- feed_eof
841class TcpStreamReaderProtocol(OriginalStreamReaderProtocol, StreamReaderProtocolAbstract): 842 def __init__(self, manager: TcpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None: 843 self._bind_to_stream_manager(manager, message_protocol_settings, *args, **kwargs) 844 845 def _bind_to_stream_manager(self, manager: TcpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None: 846 super().__init__(*args, **kwargs) 847 848 self._stream_manager = manager 849 self._message_protocol_settings: MessageProtocolSettings = message_protocol_settings 850 851 def connection_made(self, transport): 852 if self._reject_connection: 853 context = { 854 'message': ('An open stream was garbage collected prior to ' 855 'establishing network connection; ' 856 'call "stream.close()" explicitly.') 857 } 858 if self._source_traceback: 859 context['source_traceback'] = self._source_traceback 860 self._loop.call_exception_handler(context) 861 transport.abort() 862 return 863 864 self._transport = transport 865 reader = self._stream_reader 866 if reader is not None: 867 reader.set_transport(transport) 868 869 self._over_ssl = transport.get_extra_info('sslcontext') is not None 870 if self._client_connected_cb is not None: 871 self._stream_writer = TcpStreamWriter(transport, self, 872 reader, 873 self._loop) 874 res = self._client_connected_cb(reader, 875 self._stream_writer) 876 if coroutines.iscoroutine(res): 877 self._loop.create_task(res) 878 879 self._strong_reader = None
Helper class to adapt between Protocol and StreamReader.
(This is a helper class instead of making StreamReader itself a Protocol subclass, because the StreamReader has other potential uses, and to prevent the user of the StreamReader to accidentally call inappropriate methods of the protocol.)
851 def connection_made(self, transport): 852 if self._reject_connection: 853 context = { 854 'message': ('An open stream was garbage collected prior to ' 855 'establishing network connection; ' 856 'call "stream.close()" explicitly.') 857 } 858 if self._source_traceback: 859 context['source_traceback'] = self._source_traceback 860 self._loop.call_exception_handler(context) 861 transport.abort() 862 return 863 864 self._transport = transport 865 reader = self._stream_reader 866 if reader is not None: 867 reader.set_transport(transport) 868 869 self._over_ssl = transport.get_extra_info('sslcontext') is not None 870 if self._client_connected_cb is not None: 871 self._stream_writer = TcpStreamWriter(transport, self, 872 reader, 873 self._loop) 874 res = self._client_connected_cb(reader, 875 self._stream_writer) 876 if coroutines.iscoroutine(res): 877 self._loop.create_task(res) 878 879 self._strong_reader = None
Called when a connection is made.
The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.
Inherited Members
- asyncio.streams.StreamReaderProtocol
- connection_lost
- data_received
- eof_received
- asyncio.streams.FlowControlMixin
- pause_writing
- resume_writing
1008class TcpStreamWriter(OriginalStreamWriter, StreamWriterAbstract): 1009 def __init__(self, *args, **kwargs) -> None: 1010 self._bind_to_stream_manager(*args, **kwargs) 1011 1012 def _bind_to_stream_manager(self, *args, **kwargs) -> None: 1013 super().__init__(*args, **kwargs) 1014 1015 self._stream_manager: TcpStreamManager = self._protocol._stream_manager 1016 self._output_to_client: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.output_to_client_container_type( 1017 external_data_length=self._stream_manager.io_memory_management.global_out__data_full_size) 1018 self.socket_write_fixed_buffer_size: ValueExistence = self._stream_manager.io_memory_management.socket_write_fixed_buffer_size 1019 self._autonomous_writer_future: Task = None 1020 self._autonomous_writer_future_stop_requessted: bool = False 1021 self._autonomous_writer_drain_enough_futures: List[Future] = list() 1022 1023 def optimized_write(self, data): 1024 self._output_to_client.add_piece_of_data(data) 1025 # self.write(data) 1026 1027 def owrite(self, data): 1028 return self.optimized_write(data) 1029 1030 async def partial_drain(self): 1031 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 1032 while another_piece_of_data: 1033 self.write(another_piece_of_data) 1034 await self.drain() 1035 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 1036 1037 async def pdrain(self): 1038 return await self.partial_drain() 1039 1040 async def full_drain(self): 1041 await self.pdrain() 1042 rest_of_the_data_size = self._output_to_client.size() 1043 if rest_of_the_data_size: 1044 another_piece_of_data = self._output_to_client.get_data(rest_of_the_data_size) 1045 self.write(another_piece_of_data) 1046 await self.drain() 1047 1048 async def fdrain(self): 1049 return await self.full_drain() 1050 1051 def _release_autonomous_writer_waiters(self): 1052 current_size = self._output_to_client.size() 1053 autonomous_writer_drain_enough_futures_buff = self._autonomous_writer_drain_enough_futures 1054 self._autonomous_writer_drain_enough_futures = type(autonomous_writer_drain_enough_futures_buff)() 1055 for item in autonomous_writer_drain_enough_futures_buff: 1056 lower_water_size, future = item 1057 if current_size < lower_water_size: 1058 if (not future.done()) and (not future.cancelled()): 1059 future.set_result(None) 1060 1061 if (not future.done()) and (not future.cancelled()): 1062 self._autonomous_writer_drain_enough_futures.append(item) 1063 1064 async def _autonomous_writer_impl(self): 1065 ty = TimedYield(0) 1066 while not self._autonomous_writer_future_stop_requessted: 1067 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 1068 self._release_autonomous_writer_waiters() 1069 while another_piece_of_data: 1070 self.write(another_piece_of_data) 1071 await self.drain() 1072 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 1073 self._release_autonomous_writer_waiters() 1074 1075 await ty() 1076 1077 def start_autonomous_writer(self): 1078 self._autonomous_writer_future = create_task(self._autonomous_writer_impl) 1079 1080 def start_aw(self): 1081 return self.start_autonomous_writer() 1082 1083 async def stop_autonomous_writer(self, timeout: Optional[Union[int, float]] = 0): 1084 """_summary_ 1085 1086 Args: 1087 timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout 1088 1089 Returns: 1090 _type_: _description_ 1091 """ 1092 result = None 1093 if timeout is None: 1094 timeout = self._stream_manager.autonomous_writer_stop_default_timeout 1095 1096 if self._autonomous_writer_future and (not self._autonomous_writer_future_stop_requessted): 1097 self._autonomous_writer_future_stop_requessted = True 1098 if timeout: 1099 result = await asyncio.wait_for(self._autonomous_writer_future, timeout) 1100 else: 1101 result = await self._autonomous_writer_future 1102 1103 self._autonomous_writer_future = None 1104 self._autonomous_writer_future_stop_requessted = False 1105 1106 return result 1107 1108 async def stop_aw(self, timeout: Optional[Union[int, float]] = 0): 1109 """_summary_ 1110 1111 Args: 1112 timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout 1113 1114 Returns: 1115 _type_: _description_ 1116 """ 1117 return await self.stop_autonomous_writer(timeout) 1118 1119 async def autonomous_writer_drain_enough(self, lower_water_size: Optional[int] = None): 1120 if lower_water_size is None: 1121 lower_water_size = self.socket_write_fixed_buffer_size.value * 3 1122 # print(f'lower_water_size: {lower_water_size}') 1123 # lower_water_size = cpu_info().l3_cache_size 1124 1125 if lower_water_size <= self._output_to_client.size(): 1126 future: Future = self._loop.create_future() 1127 self._autonomous_writer_drain_enough_futures.append((lower_water_size, future)) 1128 await future 1129 1130 async def aw_drain_enough(self): 1131 await self.autonomous_writer_drain_enough() 1132 1133 def optimized_write_message(self, data): 1134 self.optimized_write(len(data).to_bytes(self._protocol._message_protocol_settings.message_size_len, 'little') + data) 1135 1136 def owrite_message(self, data): 1137 self.optimized_write_message(data) 1138 1139 async def send_message(self, data): 1140 self.optimized_write_message(data) 1141 await self.fdrain() 1142 1143 @asynccontextmanager 1144 async def autonomous_writer(self): 1145 self.start_autonomous_writer() 1146 try: 1147 yield 1148 finally: 1149 await self.stop_autonomous_writer() 1150 1151 aw = autonomous_writer 1152 1153 def __enter__(self): 1154 pass 1155 1156 def __exit__(self, exc_type, exc, tb): 1157 self.close()
Wraps a Transport.
This exposes write(), writelines(), [can_]write_eof(), get_extra_info() and close(). It adds drain() which returns an optional Future on which you can wait for flow control. It also adds a transport property which references the Transport directly.
1030 async def partial_drain(self): 1031 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 1032 while another_piece_of_data: 1033 self.write(another_piece_of_data) 1034 await self.drain() 1035 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
1083 async def stop_autonomous_writer(self, timeout: Optional[Union[int, float]] = 0): 1084 """_summary_ 1085 1086 Args: 1087 timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout 1088 1089 Returns: 1090 _type_: _description_ 1091 """ 1092 result = None 1093 if timeout is None: 1094 timeout = self._stream_manager.autonomous_writer_stop_default_timeout 1095 1096 if self._autonomous_writer_future and (not self._autonomous_writer_future_stop_requessted): 1097 self._autonomous_writer_future_stop_requessted = True 1098 if timeout: 1099 result = await asyncio.wait_for(self._autonomous_writer_future, timeout) 1100 else: 1101 result = await self._autonomous_writer_future 1102 1103 self._autonomous_writer_future = None 1104 self._autonomous_writer_future_stop_requessted = False 1105 1106 return result
_summary_
Args: timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout
Returns: _type_: _description_
1108 async def stop_aw(self, timeout: Optional[Union[int, float]] = 0): 1109 """_summary_ 1110 1111 Args: 1112 timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout 1113 1114 Returns: 1115 _type_: _description_ 1116 """ 1117 return await self.stop_autonomous_writer(timeout)
_summary_
Args: timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout
Returns: _type_: _description_
1119 async def autonomous_writer_drain_enough(self, lower_water_size: Optional[int] = None): 1120 if lower_water_size is None: 1121 lower_water_size = self.socket_write_fixed_buffer_size.value * 3 1122 # print(f'lower_water_size: {lower_water_size}') 1123 # lower_water_size = cpu_info().l3_cache_size 1124 1125 if lower_water_size <= self._output_to_client.size(): 1126 future: Future = self._loop.create_future() 1127 self._autonomous_writer_drain_enough_futures.append((lower_water_size, future)) 1128 await future
Inherited Members
- asyncio.streams.StreamWriter
- transport
- write
- writelines
- write_eof
- can_write_eof
- close
- is_closing
- wait_closed
- get_extra_info
- drain