cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.udp_efficient_streams
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19""" 20Module Docstring 21Docstrings: http://www.python.org/dev/peps/pep-0257/ 22""" 23 24 25__author__ = "ButenkoMS <gtalk@butenkoms.space>" 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 28__license__ = "Apache License, Version 2.0" 29__version__ = "4.4.1" 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 31__email__ = "gtalk@butenkoms.space" 32# __status__ = "Prototype" 33__status__ = "Development" 34# __status__ = "Production" 35 36 37__all__ = ['StreamType', 'GateSecurityPolicy', 'StreamManagerIOCoreMemoryManagement', 'UdpStreamManager', 'UdpStreamReader', 'UdpStreamReaderProtocol', 'UdpStreamWriter'] 38 39 40import warnings 41import asyncio 42from asyncio.exceptions import IncompleteReadError, LimitOverrunError 43from asyncio import streams 44from asyncio.streams import StreamWriter as OriginalStreamWriter 45from asyncio.streams import StreamReader as OriginalStreamReader 46from asyncio.streams import StreamReaderProtocol as OriginalStreamReaderProtocol 47from asyncio import events 48from asyncio import proactor_events 49from asyncio import selector_events 50try: 51 from asyncio import unix_events 52except ImportError: 53 pass 54 55from asyncio import coroutines 56from asyncio.tasks import sleep, Task 57from asyncio.futures import Future 58from copy import copy 59from enum import Enum 60from cengal.io.asock_io.versions.v_1.recv_buff_size_computer.recv_buff_size_computer__python import RecvBuffSizeComputer 61# from cengal.io.asock_io.versions.v_1.base import IOCoreMemoryManagement 62from cengal.parallel_execution.asyncio.atasks import create_task 63from cengal.parallel_execution.asyncio.timed_yield import TimedYield 64from cengal.hardware.info.cpu import cpu_info 65# from cengal.data_containers.dynamic_list_of_pieces import DynamicListOfPiecesDequeWithLengthControl 66# from cengal.data_containers.fast_fifo import FIFODequeWithLengthControl, FIFOIsEmpty 67# from cengal.data_manipulation.front_triggerable_variable import FrontTriggerableVariable, FrontTriggerableVariableType 68from cengal.data_containers.dynamic_list_of_pieces.versions.v_1.dynamic_list_of_pieces__python import DynamicListOfPiecesDequeWithLengthControl 69from cengal.code_flow_control.smart_values.versions import ValueExistence 70from typing import Sequence, Tuple, Type, Set, Optional, Union, List 71from .efficient_streams_base_internal import * 72from .efficient_streams_base import * 73from .efficient_streams_abstract import * 74 75 76class UdpStreamManager(StreamManagerAbstract): 77 def __init__(self) -> None: 78 self.io_memory_management: StreamManagerIOCoreMemoryManagement = StreamManagerIOCoreMemoryManagement() 79 self.autonomous_writer_stop_default_timeout: Optional[Union[int, float]] = 10.0 80 self.output_to_client_container_type = DynamicListOfPiecesDequeWithLengthControl 81 self.input_from_client_container_type = DynamicListOfPiecesDequeWithLengthControl 82 83 async def open_connection(self, host=None, port=None, *, 84 loop=None, limit=DEFAULT_LIMIT, 85 stream_type: StreamType = StreamType.general, stream_name: str = str(), 86 protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None, 87 max_message_size_len: Optional[int] = None, 88 **kwds): 89 """A wrapper for create_connection() returning a (reader, writer) pair. 90 91 The reader returned is a UdpStreamReader instance; the writer is a 92 UdpStreamWriter instance. 93 94 The arguments are all the usual arguments to create_connection() 95 except protocol_factory; most common are positional host and port, 96 with various optional keyword arguments following. 97 98 Additional optional keyword arguments are loop (to set the event loop 99 instance to use) and limit (to set the buffer limit passed to the 100 UdpStreamReader). 101 102 (If you want to customize the UdpStreamReader and/or 103 UdpStreamReaderProtocol classes, just copy the code -- there's 104 really nothing special here except some convenience.) 105 """ 106 if StreamType.gate == stream_type: 107 raise ValueError(f'Wrong stream_type value: client can not be a "gate".') 108 109 if loop is None: 110 loop = events.get_event_loop() 111 else: 112 warnings.warn("The loop argument is deprecated since Python 3.8, " 113 "and scheduled for removal in Python 3.10.", 114 DeprecationWarning, stacklevel=2) 115 116 message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len) 117 reader = UdpStreamReader(self, message_protocol_settings, limit=limit, loop=loop) 118 protocol = UdpStreamReaderProtocol(self, message_protocol_settings, reader, loop=loop) 119 transport, _ = await loop.create_connection( 120 lambda: protocol, host, port, **kwds) 121 writer = UdpStreamWriter(transport, protocol, reader, loop) 122 return reader, writer 123 124 async def start_server(self, client_connected_cb, host=None, port=None, *, 125 loop=None, limit=DEFAULT_LIMIT, 126 stream_type: StreamType = StreamType.general, stream_name: str = str(), 127 gate_security_policy: GateSecurityPolicy = GateSecurityPolicy.disabled, policy_managed_stream_names: Optional[Set[str]] = None, 128 protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None, 129 max_message_size_len: Optional[int] = None, 130 **kwds): 131 """Start a socket server, call back for each client connected. 132 133 The first parameter, `client_connected_cb`, takes two parameters: 134 client_reader, client_writer. client_reader is a UdpStreamReader 135 object, while client_writer is a UdpStreamWriter object. This 136 parameter can either be a plain callback function or a coroutine; 137 if it is a coroutine, it will be automatically converted into a 138 Task. 139 140 The rest of the arguments are all the usual arguments to 141 loop.create_server() except protocol_factory; most common are 142 positional host and port, with various optional keyword arguments 143 following. The return value is the same as loop.create_server(). 144 145 Additional optional keyword arguments are loop (to set the event loop 146 instance to use) and limit (to set the buffer limit passed to the 147 UdpStreamReader). 148 149 The return value is the same as loop.create_server(), i.e. a 150 Server object which can be used to stop the service. 151 """ 152 if loop is None: 153 loop = events.get_event_loop() 154 else: 155 warnings.warn("The loop argument is deprecated since Python 3.8, " 156 "and scheduled for removal in Python 3.10.", 157 DeprecationWarning, stacklevel=2) 158 159 def factory(): 160 message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len) 161 reader = UdpStreamReader(self, message_protocol_settings, limit=limit, loop=loop) 162 protocol = UdpStreamReaderProtocol(self, message_protocol_settings, reader, client_connected_cb, 163 loop=loop) 164 return protocol 165 166 return await loop.create_server(factory, host, port, **kwds) 167 168 async def try_establish_message_protocol_server_side(self, reader: 'UdpStreamReader', writer: 'UdpStreamWriter') -> bool: 169 return True 170 171 async def try_establish_message_protocol_client_side(self, reader: 'UdpStreamReader', writer: 'UdpStreamWriter') -> bool: 172 return True 173 174 175class Server(events.AbstractServer): 176 177 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, 178 ssl_handshake_timeout): 179 self._loop = loop 180 self._sockets = sockets 181 self._active_count = 0 182 self._waiters = [] 183 self._protocol_factory = protocol_factory 184 self._backlog = backlog 185 self._ssl_context = ssl_context 186 self._ssl_handshake_timeout = ssl_handshake_timeout 187 self._serving = False 188 self._serving_forever_fut = None 189 190 def __repr__(self): 191 return f'<{self.__class__.__name__} sockets={self.sockets!r}>' 192 193 def get_loop(self): 194 return self._loop 195 196 def is_serving(self): 197 return self._serving 198 199 @property 200 def sockets(self): 201 if self._sockets is None: 202 return () 203 return tuple(trsock.TransportSocket(s) for s in self._sockets) 204 205 def close(self): 206 sockets = self._sockets 207 if sockets is None: 208 return 209 self._sockets = None 210 211 for sock in sockets: 212 self._loop._stop_serving(sock) 213 214 self._serving = False 215 216 if (self._serving_forever_fut is not None and 217 not self._serving_forever_fut.done()): 218 self._serving_forever_fut.cancel() 219 self._serving_forever_fut = None 220 221 if self._active_count == 0: 222 self._wakeup() 223 224 async def start_serving(self): 225 self._start_serving() 226 # Skip one loop iteration so that all 'loop.add_reader' 227 # go through. 228 await tasks.sleep(0, loop=self._loop) 229 230 async def serve_forever(self): 231 if self._serving_forever_fut is not None: 232 raise RuntimeError( 233 f'server {self!r} is already being awaited on serve_forever()') 234 if self._sockets is None: 235 raise RuntimeError(f'server {self!r} is closed') 236 237 self._start_serving() 238 self._serving_forever_fut = self._loop.create_future() 239 240 try: 241 await self._serving_forever_fut 242 except exceptions.CancelledError: 243 try: 244 self.close() 245 await self.wait_closed() 246 finally: 247 raise 248 finally: 249 self._serving_forever_fut = None 250 251 async def wait_closed(self): 252 if self._sockets is None or self._waiters is None: 253 return 254 waiter = self._loop.create_future() 255 self._waiters.append(waiter) 256 await waiter 257 258 259class UdpStreamReader(OriginalStreamReader, StreamReaderAbstract): 260 def __init__(self, manager: UdpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None: 261 self._stream_manager = manager 262 self._message_protocol_settings: MessageProtocolSettings = message_protocol_settings 263 super().__init__(*args, **kwargs) 264 self._smart_buffer: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.input_from_client_container_type( 265 external_data_length=self._stream_manager.io_memory_management.global_in__data_full_size) 266 self.recv_buff_size_computer = RecvBuffSizeComputer() 267 cpu_info_inst = cpu_info() 268 # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size 269 # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l2_cache_size_per_virtual_core 270 # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size_per_virtual_core 271 # self.recv_buff_size_computer.max_recv_buff_size = 3145728 272 self.recv_buff_size_computer.max_recv_buff_size = 10 * 1024**2 273 # self.recv_buff_size_computer.max_recv_buff_size = 1024 274 # print(f'max_recv_buff_size: {self.recv_buff_size_computer.max_recv_buff_size}') 275 self.limit_by_limit: bool = True 276 self.limit_by_global_in__data_size_limit: bool = True 277 278 async def read_max(self): 279 return await self.read(self._limit) 280 281 async def read_nearly_max(self): 282 return await self.read_nearly(self._limit) 283 284 async def read_with_counter(self): 285 if self._exception is not None: 286 raise self._exception 287 288 # This used to just loop creating a new waiter hoping to 289 # collect everything in self._buffer, but that would 290 # deadlock if the subprocess sends more than self.limit 291 # bytes. So just call self.read(self._limit) until EOF. 292 blocks = [] 293 counter = 0 294 while True: 295 block = await self.read_max() 296 counter += 1 297 if not block: 298 break 299 blocks.append(block) 300 return b''.join(blocks), counter 301 302 def __repr__(self): 303 info = ['UdpStreamReader'] 304 if self._smart_buffer.size(): 305 info.append(f'{self._smart_buffer.size()} bytes') 306 if self._eof: 307 info.append('eof') 308 if self._limit != DEFAULT_LIMIT: 309 info.append(f'limit={self._limit}') 310 if self._waiter: 311 info.append(f'waiter={self._waiter!r}') 312 if self._exception: 313 info.append(f'exception={self._exception!r}') 314 if self._transport: 315 info.append(f'transport={self._transport!r}') 316 if self._paused: 317 info.append('paused') 318 return '<{}>'.format(' '.join(info)) 319 320 def _maybe_resume_transport(self): 321 if isinstance(self._transport, ( 322 proactor_events._ProactorDatagramTransport, 323 selector_events._SelectorTransport, 324 unix_events._UnixReadPipeTransport 325 )): 326 # if hasattr(self._transport, 'max_size'): 327 try: 328 self._transport.max_size = self.recv_buff_size_computer.recv_buff_size 329 # print(f'max_size: {self._transport.max_size}') 330 except AttributeError: 331 pass 332 else: 333 print(f'Unsupported transport: {type(self._transport)}') 334 335 if self._paused \ 336 and ( 337 ((not self.limit_by_limit) and (not self.limit_by_global_in__data_size_limit)) \ 338 or (self.limit_by_limit and (not self._limit)) \ 339 or (self.limit_by_limit and (self._smart_buffer.size() <= self._limit)) \ 340 or (self.limit_by_global_in__data_size_limit and (not self._stream_manager.io_memory_management.global_in__data_size_limit)) \ 341 or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value <= self._stream_manager.io_memory_management.global_in__data_size_limit.value)) 342 ): 343 self._paused = False 344 self._transport.resume_reading() 345 346 def at_eof(self): 347 """Return True if the buffer is empty and 'feed_eof' was called.""" 348 return self._eof and not self._smart_buffer.size() 349 350 def feed_data(self, data): 351 assert not self._eof, 'feed_data after feed_eof' 352 353 if not data: 354 return 355 356 data_len = len(data) 357 self.recv_buff_size_computer.calc_new_recv_buff_size(data_len) 358 self._smart_buffer.add_piece_of_data(data) 359 self._wakeup_waiter() 360 361 if (self._transport is not None and 362 not self._paused 363 and ( 364 (self.limit_by_limit and (self._smart_buffer.size() > 2 * self._limit) 365 or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value > self._stream_manager.io_memory_management.global_in__data_size_limit.value))) 366 )): 367 try: 368 self._transport.pause_reading() 369 except NotImplementedError: 370 # The transport can't be paused. 371 # We'll just have to buffer all data. 372 # Forget the transport so we don't keep trying. 373 self._transport = None 374 else: 375 self._paused = True 376 377 async def readline(self): 378 """Read chunk of data from the stream until newline (b'\n') is found. 379 380 On success, return chunk that ends with newline. If only partial 381 line can be read due to EOF, return incomplete line without 382 terminating newline. When EOF was reached while no bytes read, empty 383 bytes object is returned. 384 385 If limit is reached, ValueError will be raised. In that case, if 386 newline was found, complete line including newline will be removed 387 from internal buffer. Else, internal buffer will be cleared. Limit is 388 compared against part of the line without newline. 389 390 If stream was paused, this function will automatically resume it if 391 needed. 392 """ 393 sep = b'\n' 394 seplen = len(sep) 395 try: 396 line = await self.readuntil(sep) 397 except IncompleteReadError as e: 398 return e.partial 399 except LimitOverrunError as e: 400 if self._smart_buffer.startswith(sep, e.consumed): 401 self._smart_buffer.get_data(e.consumed + seplen) 402 else: 403 self._smart_buffer.clear() 404 405 self._maybe_resume_transport() 406 raise ValueError(e.args[0]) 407 return line 408 409 async def readuntil(self, separator=b'\n'): 410 """Read data from the stream until ``separator`` is found. 411 412 On success, the data and separator will be removed from the 413 internal buffer (consumed). Returned data will include the 414 separator at the end. 415 416 Configured stream limit is used to check result. Limit sets the 417 maximal length of data that can be returned, not counting the 418 separator. 419 420 If an EOF occurs and the complete separator is still not found, 421 an IncompleteReadError exception will be raised, and the internal 422 buffer will be reset. The IncompleteReadError.partial attribute 423 may contain the separator partially. 424 425 If the data cannot be read because of over limit, a 426 LimitOverrunError exception will be raised, and the data 427 will be left in the internal buffer, so it can be read again. 428 """ 429 seplen = len(separator) 430 if seplen == 0: 431 raise ValueError('Separator should be at least one-byte string') 432 433 if self._exception is not None: 434 raise self._exception 435 436 # Consume whole buffer except last bytes, which length is 437 # one less than seplen. Let's check corner cases with 438 # separator='SEPARATOR': 439 # * we have received almost complete separator (without last 440 # byte). i.e buffer='some textSEPARATO'. In this case we 441 # can safely consume len(separator) - 1 bytes. 442 # * last byte of buffer is first byte of separator, i.e. 443 # buffer='abcdefghijklmnopqrS'. We may safely consume 444 # everything except that last byte, but this require to 445 # analyze bytes of buffer that match partial separator. 446 # This is slow and/or require FSM. For this case our 447 # implementation is not optimal, since require rescanning 448 # of data that is known to not belong to separator. In 449 # real world, separator will not be so long to notice 450 # performance problems. Even when reading MIME-encoded 451 # messages :) 452 453 # `offset` is the number of bytes from the beginning of the buffer 454 # where there is no occurrence of `separator`. 455 offset = 0 456 457 # Loop until we find `separator` in the buffer, exceed the buffer size, 458 # or an EOF has happened. 459 while True: 460 buflen = self._smart_buffer.size() 461 462 # Check if we now have enough data in the buffer for `separator` to 463 # fit. 464 if buflen - offset >= seplen: 465 isep = self._smart_buffer.find(separator, offset) 466 467 if isep != -1: 468 # `separator` is in the buffer. `isep` will be used later 469 # to retrieve the data. 470 break 471 472 # see upper comment for explanation. 473 offset = buflen + 1 - seplen 474 if offset > self._limit: 475 raise LimitOverrunError( 476 'Separator is not found, and chunk exceed the limit', 477 offset) 478 479 # Complete message (with full separator) may be present in buffer 480 # even when EOF flag is set. This may happen when the last chunk 481 # adds data which makes separator be found. That's why we check for 482 # EOF *ater* inspecting the buffer. 483 if self._eof: 484 chunk = self._smart_buffer.get_data(self._smart_buffer.size()) 485 raise IncompleteReadError(chunk, None) 486 487 # _wait_for_data() will resume reading if stream was paused. 488 await self._wait_for_data('readuntil') 489 490 if isep > self._limit: 491 raise LimitOverrunError( 492 'Separator is found, but chunk is longer than limit', isep) 493 494 chunk = self._smart_buffer.get_data(isep + seplen) 495 self._maybe_resume_transport() 496 return bytes(chunk) 497 498 async def read(self, n=-1): 499 """Read up to `n` bytes from the stream. 500 501 If n is not provided, or set to -1, read until EOF and return all read 502 bytes. If the EOF was received and the internal buffer is empty, return 503 an empty bytes object. 504 505 If n is zero, return empty bytes object immediately. 506 507 If n is positive, this function try to read `n` bytes, and may return 508 less or equal bytes than requested, but at least one byte. If EOF was 509 received before any byte is read, this function returns empty byte 510 object. 511 512 Returned value is not limited with limit, configured at stream 513 creation. 514 515 If stream was paused, this function will automatically resume it if 516 needed. 517 """ 518 519 if self._exception is not None: 520 raise self._exception 521 522 if n == 0: 523 return b'' 524 525 if n < 0: 526 # This used to just loop creating a new waiter hoping to 527 # collect everything in self._buffer, but that would 528 # deadlock if the subprocess sends more than self.limit 529 # bytes. So just call self.read(self._limit) until EOF. 530 blocks = [] 531 while True: 532 block = await self.read_nearly(max(self._limit, self._smart_buffer.size())) 533 if not block: 534 break 535 blocks.append(block) 536 return b''.join(blocks) 537 538 if not self._smart_buffer.size() and not self._eof: 539 await self._wait_for_data('read') 540 541 # This will work right even if buffer is less than n bytes 542 data = self._smart_buffer.get_data(min(n, self._smart_buffer.size())) 543 544 self._maybe_resume_transport() 545 return data 546 547 async def read_nearly(self, n=-1): 548 """Read up to `n` bytes from the stream. 549 550 If n is not provided, or set to -1, read until EOF and return all read 551 bytes. If the EOF was received and the internal buffer is empty, return 552 an empty bytes object. 553 554 If n is zero, return empty bytes object immediately. 555 556 If n is positive, this function try to read `n` bytes, and may return 557 less or equal bytes than requested, but at least one byte. If EOF was 558 received before any byte is read, this function returns empty byte 559 object. 560 561 Returned value is not limited with limit, configured at stream 562 creation. 563 564 If stream was paused, this function will automatically resume it if 565 needed. 566 """ 567 568 if self._exception is not None: 569 raise self._exception 570 571 if n == 0: 572 return b'' 573 574 if n < 0: 575 # This used to just loop creating a new waiter hoping to 576 # collect everything in self._buffer, but that would 577 # deadlock if the subprocess sends more than self.limit 578 # bytes. So just call self.read(self._limit) until EOF. 579 blocks = [] 580 while True: 581 block = await self.read_nearly(max(self._limit, self._smart_buffer.size())) 582 if not block: 583 break 584 blocks.append(block) 585 return b''.join(blocks) 586 587 if not self._smart_buffer.size() and not self._eof: 588 await self._wait_for_data('read') 589 590 # This will work right even if buffer is less than n bytes 591 data = self._smart_buffer.get_data_nearly(n) 592 593 self._maybe_resume_transport() 594 return data 595 596 async def readexactly(self, n): 597 """Read exactly `n` bytes. 598 599 Raise an IncompleteReadError if EOF is reached before `n` bytes can be 600 read. The IncompleteReadError.partial attribute of the exception will 601 contain the partial read bytes. 602 603 if n is zero, return empty bytes object. 604 605 Returned value is not limited with limit, configured at stream 606 creation. 607 608 If stream was paused, this function will automatically resume it if 609 needed. 610 """ 611 if n < 0: 612 raise ValueError('readexactly size can not be less than zero') 613 614 if self._exception is not None: 615 raise self._exception 616 617 if n == 0: 618 return b'' 619 620 while self._smart_buffer.size() < n: 621 if self._eof: 622 incomplete = self._smart_buffer.get_data(self._smart_buffer.size()) 623 raise IncompleteReadError(incomplete, n) 624 625 await self._wait_for_data('readexactly') 626 627 if self._smart_buffer.size() == n: 628 data = self._smart_buffer.get_data(self._smart_buffer.size()) 629 else: 630 data = self._smart_buffer.get_data(n) 631 632 self._maybe_resume_transport() 633 return data 634 635 async def readonly_exactly(self, n): 636 if n < 0: 637 raise ValueError('readexactly size can not be less than zero') 638 639 if self._exception is not None: 640 raise self._exception 641 642 if n == 0: 643 return b'' 644 645 while self._smart_buffer.size() < n: 646 if self._eof: 647 incomplete = self._smart_buffer.read_data(self._smart_buffer.size()) 648 raise IncompleteReadError(incomplete, n) 649 650 await self._wait_for_data('readexactly') 651 652 if self._smart_buffer.size() == n: 653 data = self._smart_buffer.read_data(self._smart_buffer.size()) 654 else: 655 data = self._smart_buffer.read_data(n) 656 657 self._maybe_resume_transport() 658 return data 659 660 async def read_message(self): 661 message_len_encoded = await self.readexactly(self._message_protocol_settings._message_size_len) 662 message_len = int.from_bytes(message_len_encoded, 'little') 663 return await self.readexactly(message_len) 664 665 def message_awailable(self) -> bool: 666 message_size_len = self._message_protocol_settings._message_size_len 667 if self._smart_buffer.size() < message_size_len: 668 return False 669 670 message_len_encoded = self._smart_buffer.get_data(message_size_len) 671 message_len = int.from_bytes(message_len_encoded, 'little') 672 if self._smart_buffer.size() < (message_size_len + message_len): 673 return False 674 675 return True 676 677 def transport_pause_reading(self): 678 try: 679 self._transport.pause_reading() 680 except NotImplementedError: 681 # The transport can't be paused. 682 # We'll just have to buffer all data. 683 # Forget the transport so we don't keep trying. 684 pass 685 else: 686 self._paused = True 687 688 def transport_resume_reading(self): 689 self._paused = False 690 self._transport.resume_reading() 691 692 693class UdpStreamReaderProtocol(OriginalStreamReaderProtocol, StreamReaderProtocolAbstract): 694 def __init__(self, manager: UdpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None: 695 self._stream_manager = manager 696 self._message_protocol_settings: MessageProtocolSettings = message_protocol_settings 697 super().__init__(*args, **kwargs) 698 699 def connection_made(self, transport): 700 if self._reject_connection: 701 context = { 702 'message': ('An open stream was garbage collected prior to ' 703 'establishing network connection; ' 704 'call "stream.close()" explicitly.') 705 } 706 if self._source_traceback: 707 context['source_traceback'] = self._source_traceback 708 self._loop.call_exception_handler(context) 709 transport.abort() 710 return 711 712 self._transport = transport 713 reader = self._stream_reader 714 if reader is not None: 715 reader.set_transport(transport) 716 717 self._over_ssl = transport.get_extra_info('sslcontext') is not None 718 if self._client_connected_cb is not None: 719 self._stream_writer = UdpStreamWriter(transport, self, 720 reader, 721 self._loop) 722 res = self._client_connected_cb(reader, 723 self._stream_writer) 724 if coroutines.iscoroutine(res): 725 self._loop.create_task(res) 726 727 self._strong_reader = None 728 729 730class UdpStreamWriter(OriginalStreamWriter, StreamWriterAbstract): 731 def __init__(self, *args, **kwargs) -> None: 732 super().__init__(*args, **kwargs) 733 self._stream_manager: UdpStreamManager = self._protocol._stream_manager 734 self._output_to_client: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.output_to_client_container_type( 735 external_data_length=self._stream_manager.io_memory_management.global_out__data_full_size) 736 self.socket_write_fixed_buffer_size: ValueExistence = self._stream_manager.io_memory_management.socket_write_fixed_buffer_size 737 self._autonomous_writer_future: Task = None 738 self._autonomous_writer_future_stop_requessted: bool = False 739 self._autonomous_writer_drain_enough_futures: List[Future] = list() 740 741 def optimized_write(self, data): 742 self._output_to_client.add_piece_of_data(data) 743 # self.write(data) 744 745 def owrite(self, data): 746 return self.optimized_write(data) 747 748 async def partial_drain(self): 749 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 750 while another_piece_of_data: 751 self.write(another_piece_of_data) 752 await self.drain() 753 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 754 755 async def pdrain(self): 756 return await self.partial_drain() 757 758 async def full_drain(self): 759 await self.pdrain() 760 rest_of_the_data_size = self._output_to_client.size() 761 if rest_of_the_data_size: 762 another_piece_of_data = self._output_to_client.get_data(rest_of_the_data_size) 763 self.write(another_piece_of_data) 764 await self.drain() 765 766 async def fdrain(self): 767 return await self.full_drain() 768 769 def _release_autonomous_writer_waiters(self): 770 current_size = self._output_to_client.size() 771 autonomous_writer_drain_enough_futures_buff = self._autonomous_writer_drain_enough_futures 772 self._autonomous_writer_drain_enough_futures = type(autonomous_writer_drain_enough_futures_buff)() 773 for item in autonomous_writer_drain_enough_futures_buff: 774 lower_water_size, future = item 775 if current_size < lower_water_size: 776 if (not future.done()) and (not future.cancelled()): 777 future.set_result(None) 778 779 if (not future.done()) and (not future.cancelled()): 780 self._autonomous_writer_drain_enough_futures.append(item) 781 782 async def _autonomous_writer_impl(self): 783 ty = TimedYield(0) 784 while not self._autonomous_writer_future_stop_requessted: 785 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 786 self._release_autonomous_writer_waiters() 787 while another_piece_of_data: 788 self.write(another_piece_of_data) 789 await self.drain() 790 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 791 self._release_autonomous_writer_waiters() 792 793 await ty() 794 795 def start_autonomous_writer(self): 796 self._autonomous_writer_future = create_task(self._autonomous_writer_impl) 797 798 def start_aw(self): 799 return self.start_autonomous_writer() 800 801 async def stop_autonomous_writer(self, timeout: Optional[Union[int, float]] = 0): 802 """_summary_ 803 804 Args: 805 timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout 806 807 Returns: 808 _type_: _description_ 809 """ 810 result = None 811 if timeout is None: 812 timeout = self._stream_manager.autonomous_writer_stop_default_timeout 813 814 if self._autonomous_writer_future and (not self._autonomous_writer_future_stop_requessted): 815 self._autonomous_writer_future_stop_requessted = True 816 if timeout: 817 result = await asyncio.wait_for(self._autonomous_writer_future, timeout) 818 else: 819 result = await self._autonomous_writer_future 820 821 self._autonomous_writer_future = None 822 self._autonomous_writer_future_stop_requessted = False 823 824 return result 825 826 async def stop_aw(self, timeout: Optional[Union[int, float]] = 0): 827 """_summary_ 828 829 Args: 830 timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout 831 832 Returns: 833 _type_: _description_ 834 """ 835 return await self.stop_autonomous_writer(timeout) 836 837 async def autonomous_writer_drain_enough(self, lower_water_size: Optional[int] = None): 838 if lower_water_size is None: 839 lower_water_size = self.socket_write_fixed_buffer_size.value * 3 840 # print(f'lower_water_size: {lower_water_size}') 841 # lower_water_size = cpu_info().l3_cache_size 842 843 if lower_water_size <= self._output_to_client.size(): 844 future: Future = self._loop.create_future() 845 self._autonomous_writer_drain_enough_futures.append((lower_water_size, future)) 846 await future 847 848 async def aw_drain_enough(self): 849 await self.autonomous_writer_drain_enough() 850 851 def optimized_write_message(self, data): 852 self.optimized_write(len(data).to_bytes(self._protocol._message_protocol_settings._message_size_len, 'little') + data) 853 854 def owrite_message(self, data): 855 self.optimized_write_message(data) 856 857 async def send_message(self, data): 858 self.optimized_write_message(data) 859 await self.fdrain()
49class StreamType(Enum): 50 general = 0 51 message_based_anonymous = 1 52 message_based_names = 2 53 gate = 3
An enumeration.
Inherited Members
- enum.Enum
- name
- value
56class GateSecurityPolicy(Enum): 57 # gate will allow only or ban selected stream names to conect to this gate 58 allowed = 0 59 disabled = 1
An enumeration.
Inherited Members
- enum.Enum
- name
- value
62class StreamManagerIOCoreMemoryManagement(IOCoreMemoryManagement): 63 def __init__(self): 64 super(StreamManagerIOCoreMemoryManagement, self).__init__() 65 66 self.socket_write_fixed_buffer_size = ValueExistence(True, 67 cpu_info().l2_cache_size_per_virtual_core) 68 69 def link_to(self, parent): 70 super(StreamManagerIOCoreMemoryManagement, self).link_to(parent) 71 try: 72 self.socket_write_fixed_buffer_size = parent.socket_write_fixed_buffer_size 73 except AttributeError: 74 pass
Inherited Members
- cengal.io.core.memory_management.versions.v_0.memory_management.IOCoreMemoryManagement
- global__data_size_limit
- global__data_full_size
- global__deletable_data_full_size
- global_other__data_size_limit
- global_other__data_full_size
- global_other__deletable_data_full_size
- global_in__data_size_limit
- global_in__data_full_size
- global_in__deletable_data_full_size
- global_out__data_size_limit
- global_out__data_full_size
- global_out__deletable_data_full_size
77class UdpStreamManager(StreamManagerAbstract): 78 def __init__(self) -> None: 79 self.io_memory_management: StreamManagerIOCoreMemoryManagement = StreamManagerIOCoreMemoryManagement() 80 self.autonomous_writer_stop_default_timeout: Optional[Union[int, float]] = 10.0 81 self.output_to_client_container_type = DynamicListOfPiecesDequeWithLengthControl 82 self.input_from_client_container_type = DynamicListOfPiecesDequeWithLengthControl 83 84 async def open_connection(self, host=None, port=None, *, 85 loop=None, limit=DEFAULT_LIMIT, 86 stream_type: StreamType = StreamType.general, stream_name: str = str(), 87 protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None, 88 max_message_size_len: Optional[int] = None, 89 **kwds): 90 """A wrapper for create_connection() returning a (reader, writer) pair. 91 92 The reader returned is a UdpStreamReader instance; the writer is a 93 UdpStreamWriter instance. 94 95 The arguments are all the usual arguments to create_connection() 96 except protocol_factory; most common are positional host and port, 97 with various optional keyword arguments following. 98 99 Additional optional keyword arguments are loop (to set the event loop 100 instance to use) and limit (to set the buffer limit passed to the 101 UdpStreamReader). 102 103 (If you want to customize the UdpStreamReader and/or 104 UdpStreamReaderProtocol classes, just copy the code -- there's 105 really nothing special here except some convenience.) 106 """ 107 if StreamType.gate == stream_type: 108 raise ValueError(f'Wrong stream_type value: client can not be a "gate".') 109 110 if loop is None: 111 loop = events.get_event_loop() 112 else: 113 warnings.warn("The loop argument is deprecated since Python 3.8, " 114 "and scheduled for removal in Python 3.10.", 115 DeprecationWarning, stacklevel=2) 116 117 message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len) 118 reader = UdpStreamReader(self, message_protocol_settings, limit=limit, loop=loop) 119 protocol = UdpStreamReaderProtocol(self, message_protocol_settings, reader, loop=loop) 120 transport, _ = await loop.create_connection( 121 lambda: protocol, host, port, **kwds) 122 writer = UdpStreamWriter(transport, protocol, reader, loop) 123 return reader, writer 124 125 async def start_server(self, client_connected_cb, host=None, port=None, *, 126 loop=None, limit=DEFAULT_LIMIT, 127 stream_type: StreamType = StreamType.general, stream_name: str = str(), 128 gate_security_policy: GateSecurityPolicy = GateSecurityPolicy.disabled, policy_managed_stream_names: Optional[Set[str]] = None, 129 protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None, 130 max_message_size_len: Optional[int] = None, 131 **kwds): 132 """Start a socket server, call back for each client connected. 133 134 The first parameter, `client_connected_cb`, takes two parameters: 135 client_reader, client_writer. client_reader is a UdpStreamReader 136 object, while client_writer is a UdpStreamWriter object. This 137 parameter can either be a plain callback function or a coroutine; 138 if it is a coroutine, it will be automatically converted into a 139 Task. 140 141 The rest of the arguments are all the usual arguments to 142 loop.create_server() except protocol_factory; most common are 143 positional host and port, with various optional keyword arguments 144 following. The return value is the same as loop.create_server(). 145 146 Additional optional keyword arguments are loop (to set the event loop 147 instance to use) and limit (to set the buffer limit passed to the 148 UdpStreamReader). 149 150 The return value is the same as loop.create_server(), i.e. a 151 Server object which can be used to stop the service. 152 """ 153 if loop is None: 154 loop = events.get_event_loop() 155 else: 156 warnings.warn("The loop argument is deprecated since Python 3.8, " 157 "and scheduled for removal in Python 3.10.", 158 DeprecationWarning, stacklevel=2) 159 160 def factory(): 161 message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len) 162 reader = UdpStreamReader(self, message_protocol_settings, limit=limit, loop=loop) 163 protocol = UdpStreamReaderProtocol(self, message_protocol_settings, reader, client_connected_cb, 164 loop=loop) 165 return protocol 166 167 return await loop.create_server(factory, host, port, **kwds) 168 169 async def try_establish_message_protocol_server_side(self, reader: 'UdpStreamReader', writer: 'UdpStreamWriter') -> bool: 170 return True 171 172 async def try_establish_message_protocol_client_side(self, reader: 'UdpStreamReader', writer: 'UdpStreamWriter') -> bool: 173 return True
84 async def open_connection(self, host=None, port=None, *, 85 loop=None, limit=DEFAULT_LIMIT, 86 stream_type: StreamType = StreamType.general, stream_name: str = str(), 87 protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None, 88 max_message_size_len: Optional[int] = None, 89 **kwds): 90 """A wrapper for create_connection() returning a (reader, writer) pair. 91 92 The reader returned is a UdpStreamReader instance; the writer is a 93 UdpStreamWriter instance. 94 95 The arguments are all the usual arguments to create_connection() 96 except protocol_factory; most common are positional host and port, 97 with various optional keyword arguments following. 98 99 Additional optional keyword arguments are loop (to set the event loop 100 instance to use) and limit (to set the buffer limit passed to the 101 UdpStreamReader). 102 103 (If you want to customize the UdpStreamReader and/or 104 UdpStreamReaderProtocol classes, just copy the code -- there's 105 really nothing special here except some convenience.) 106 """ 107 if StreamType.gate == stream_type: 108 raise ValueError(f'Wrong stream_type value: client can not be a "gate".') 109 110 if loop is None: 111 loop = events.get_event_loop() 112 else: 113 warnings.warn("The loop argument is deprecated since Python 3.8, " 114 "and scheduled for removal in Python 3.10.", 115 DeprecationWarning, stacklevel=2) 116 117 message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len) 118 reader = UdpStreamReader(self, message_protocol_settings, limit=limit, loop=loop) 119 protocol = UdpStreamReaderProtocol(self, message_protocol_settings, reader, loop=loop) 120 transport, _ = await loop.create_connection( 121 lambda: protocol, host, port, **kwds) 122 writer = UdpStreamWriter(transport, protocol, reader, loop) 123 return reader, writer
A wrapper for create_connection() returning a (reader, writer) pair.
The reader returned is a UdpStreamReader instance; the writer is a UdpStreamWriter instance.
The arguments are all the usual arguments to create_connection() except protocol_factory; most common are positional host and port, with various optional keyword arguments following.
Additional optional keyword arguments are loop (to set the event loop instance to use) and limit (to set the buffer limit passed to the UdpStreamReader).
(If you want to customize the UdpStreamReader and/or UdpStreamReaderProtocol classes, just copy the code -- there's really nothing special here except some convenience.)
125 async def start_server(self, client_connected_cb, host=None, port=None, *, 126 loop=None, limit=DEFAULT_LIMIT, 127 stream_type: StreamType = StreamType.general, stream_name: str = str(), 128 gate_security_policy: GateSecurityPolicy = GateSecurityPolicy.disabled, policy_managed_stream_names: Optional[Set[str]] = None, 129 protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None, 130 max_message_size_len: Optional[int] = None, 131 **kwds): 132 """Start a socket server, call back for each client connected. 133 134 The first parameter, `client_connected_cb`, takes two parameters: 135 client_reader, client_writer. client_reader is a UdpStreamReader 136 object, while client_writer is a UdpStreamWriter object. This 137 parameter can either be a plain callback function or a coroutine; 138 if it is a coroutine, it will be automatically converted into a 139 Task. 140 141 The rest of the arguments are all the usual arguments to 142 loop.create_server() except protocol_factory; most common are 143 positional host and port, with various optional keyword arguments 144 following. The return value is the same as loop.create_server(). 145 146 Additional optional keyword arguments are loop (to set the event loop 147 instance to use) and limit (to set the buffer limit passed to the 148 UdpStreamReader). 149 150 The return value is the same as loop.create_server(), i.e. a 151 Server object which can be used to stop the service. 152 """ 153 if loop is None: 154 loop = events.get_event_loop() 155 else: 156 warnings.warn("The loop argument is deprecated since Python 3.8, " 157 "and scheduled for removal in Python 3.10.", 158 DeprecationWarning, stacklevel=2) 159 160 def factory(): 161 message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len) 162 reader = UdpStreamReader(self, message_protocol_settings, limit=limit, loop=loop) 163 protocol = UdpStreamReaderProtocol(self, message_protocol_settings, reader, client_connected_cb, 164 loop=loop) 165 return protocol 166 167 return await loop.create_server(factory, host, port, **kwds)
Start a socket server, call back for each client connected.
The first parameter, client_connected_cb
, takes two parameters:
client_reader, client_writer. client_reader is a UdpStreamReader
object, while client_writer is a UdpStreamWriter object. This
parameter can either be a plain callback function or a coroutine;
if it is a coroutine, it will be automatically converted into a
Task.
The rest of the arguments are all the usual arguments to loop.create_server() except protocol_factory; most common are positional host and port, with various optional keyword arguments following. The return value is the same as loop.create_server().
Additional optional keyword arguments are loop (to set the event loop instance to use) and limit (to set the buffer limit passed to the UdpStreamReader).
The return value is the same as loop.create_server(), i.e. a Server object which can be used to stop the service.
260class UdpStreamReader(OriginalStreamReader, StreamReaderAbstract): 261 def __init__(self, manager: UdpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None: 262 self._stream_manager = manager 263 self._message_protocol_settings: MessageProtocolSettings = message_protocol_settings 264 super().__init__(*args, **kwargs) 265 self._smart_buffer: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.input_from_client_container_type( 266 external_data_length=self._stream_manager.io_memory_management.global_in__data_full_size) 267 self.recv_buff_size_computer = RecvBuffSizeComputer() 268 cpu_info_inst = cpu_info() 269 # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size 270 # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l2_cache_size_per_virtual_core 271 # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size_per_virtual_core 272 # self.recv_buff_size_computer.max_recv_buff_size = 3145728 273 self.recv_buff_size_computer.max_recv_buff_size = 10 * 1024**2 274 # self.recv_buff_size_computer.max_recv_buff_size = 1024 275 # print(f'max_recv_buff_size: {self.recv_buff_size_computer.max_recv_buff_size}') 276 self.limit_by_limit: bool = True 277 self.limit_by_global_in__data_size_limit: bool = True 278 279 async def read_max(self): 280 return await self.read(self._limit) 281 282 async def read_nearly_max(self): 283 return await self.read_nearly(self._limit) 284 285 async def read_with_counter(self): 286 if self._exception is not None: 287 raise self._exception 288 289 # This used to just loop creating a new waiter hoping to 290 # collect everything in self._buffer, but that would 291 # deadlock if the subprocess sends more than self.limit 292 # bytes. So just call self.read(self._limit) until EOF. 293 blocks = [] 294 counter = 0 295 while True: 296 block = await self.read_max() 297 counter += 1 298 if not block: 299 break 300 blocks.append(block) 301 return b''.join(blocks), counter 302 303 def __repr__(self): 304 info = ['UdpStreamReader'] 305 if self._smart_buffer.size(): 306 info.append(f'{self._smart_buffer.size()} bytes') 307 if self._eof: 308 info.append('eof') 309 if self._limit != DEFAULT_LIMIT: 310 info.append(f'limit={self._limit}') 311 if self._waiter: 312 info.append(f'waiter={self._waiter!r}') 313 if self._exception: 314 info.append(f'exception={self._exception!r}') 315 if self._transport: 316 info.append(f'transport={self._transport!r}') 317 if self._paused: 318 info.append('paused') 319 return '<{}>'.format(' '.join(info)) 320 321 def _maybe_resume_transport(self): 322 if isinstance(self._transport, ( 323 proactor_events._ProactorDatagramTransport, 324 selector_events._SelectorTransport, 325 unix_events._UnixReadPipeTransport 326 )): 327 # if hasattr(self._transport, 'max_size'): 328 try: 329 self._transport.max_size = self.recv_buff_size_computer.recv_buff_size 330 # print(f'max_size: {self._transport.max_size}') 331 except AttributeError: 332 pass 333 else: 334 print(f'Unsupported transport: {type(self._transport)}') 335 336 if self._paused \ 337 and ( 338 ((not self.limit_by_limit) and (not self.limit_by_global_in__data_size_limit)) \ 339 or (self.limit_by_limit and (not self._limit)) \ 340 or (self.limit_by_limit and (self._smart_buffer.size() <= self._limit)) \ 341 or (self.limit_by_global_in__data_size_limit and (not self._stream_manager.io_memory_management.global_in__data_size_limit)) \ 342 or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value <= self._stream_manager.io_memory_management.global_in__data_size_limit.value)) 343 ): 344 self._paused = False 345 self._transport.resume_reading() 346 347 def at_eof(self): 348 """Return True if the buffer is empty and 'feed_eof' was called.""" 349 return self._eof and not self._smart_buffer.size() 350 351 def feed_data(self, data): 352 assert not self._eof, 'feed_data after feed_eof' 353 354 if not data: 355 return 356 357 data_len = len(data) 358 self.recv_buff_size_computer.calc_new_recv_buff_size(data_len) 359 self._smart_buffer.add_piece_of_data(data) 360 self._wakeup_waiter() 361 362 if (self._transport is not None and 363 not self._paused 364 and ( 365 (self.limit_by_limit and (self._smart_buffer.size() > 2 * self._limit) 366 or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value > self._stream_manager.io_memory_management.global_in__data_size_limit.value))) 367 )): 368 try: 369 self._transport.pause_reading() 370 except NotImplementedError: 371 # The transport can't be paused. 372 # We'll just have to buffer all data. 373 # Forget the transport so we don't keep trying. 374 self._transport = None 375 else: 376 self._paused = True 377 378 async def readline(self): 379 """Read chunk of data from the stream until newline (b'\n') is found. 380 381 On success, return chunk that ends with newline. If only partial 382 line can be read due to EOF, return incomplete line without 383 terminating newline. When EOF was reached while no bytes read, empty 384 bytes object is returned. 385 386 If limit is reached, ValueError will be raised. In that case, if 387 newline was found, complete line including newline will be removed 388 from internal buffer. Else, internal buffer will be cleared. Limit is 389 compared against part of the line without newline. 390 391 If stream was paused, this function will automatically resume it if 392 needed. 393 """ 394 sep = b'\n' 395 seplen = len(sep) 396 try: 397 line = await self.readuntil(sep) 398 except IncompleteReadError as e: 399 return e.partial 400 except LimitOverrunError as e: 401 if self._smart_buffer.startswith(sep, e.consumed): 402 self._smart_buffer.get_data(e.consumed + seplen) 403 else: 404 self._smart_buffer.clear() 405 406 self._maybe_resume_transport() 407 raise ValueError(e.args[0]) 408 return line 409 410 async def readuntil(self, separator=b'\n'): 411 """Read data from the stream until ``separator`` is found. 412 413 On success, the data and separator will be removed from the 414 internal buffer (consumed). Returned data will include the 415 separator at the end. 416 417 Configured stream limit is used to check result. Limit sets the 418 maximal length of data that can be returned, not counting the 419 separator. 420 421 If an EOF occurs and the complete separator is still not found, 422 an IncompleteReadError exception will be raised, and the internal 423 buffer will be reset. The IncompleteReadError.partial attribute 424 may contain the separator partially. 425 426 If the data cannot be read because of over limit, a 427 LimitOverrunError exception will be raised, and the data 428 will be left in the internal buffer, so it can be read again. 429 """ 430 seplen = len(separator) 431 if seplen == 0: 432 raise ValueError('Separator should be at least one-byte string') 433 434 if self._exception is not None: 435 raise self._exception 436 437 # Consume whole buffer except last bytes, which length is 438 # one less than seplen. Let's check corner cases with 439 # separator='SEPARATOR': 440 # * we have received almost complete separator (without last 441 # byte). i.e buffer='some textSEPARATO'. In this case we 442 # can safely consume len(separator) - 1 bytes. 443 # * last byte of buffer is first byte of separator, i.e. 444 # buffer='abcdefghijklmnopqrS'. We may safely consume 445 # everything except that last byte, but this require to 446 # analyze bytes of buffer that match partial separator. 447 # This is slow and/or require FSM. For this case our 448 # implementation is not optimal, since require rescanning 449 # of data that is known to not belong to separator. In 450 # real world, separator will not be so long to notice 451 # performance problems. Even when reading MIME-encoded 452 # messages :) 453 454 # `offset` is the number of bytes from the beginning of the buffer 455 # where there is no occurrence of `separator`. 456 offset = 0 457 458 # Loop until we find `separator` in the buffer, exceed the buffer size, 459 # or an EOF has happened. 460 while True: 461 buflen = self._smart_buffer.size() 462 463 # Check if we now have enough data in the buffer for `separator` to 464 # fit. 465 if buflen - offset >= seplen: 466 isep = self._smart_buffer.find(separator, offset) 467 468 if isep != -1: 469 # `separator` is in the buffer. `isep` will be used later 470 # to retrieve the data. 471 break 472 473 # see upper comment for explanation. 474 offset = buflen + 1 - seplen 475 if offset > self._limit: 476 raise LimitOverrunError( 477 'Separator is not found, and chunk exceed the limit', 478 offset) 479 480 # Complete message (with full separator) may be present in buffer 481 # even when EOF flag is set. This may happen when the last chunk 482 # adds data which makes separator be found. That's why we check for 483 # EOF *ater* inspecting the buffer. 484 if self._eof: 485 chunk = self._smart_buffer.get_data(self._smart_buffer.size()) 486 raise IncompleteReadError(chunk, None) 487 488 # _wait_for_data() will resume reading if stream was paused. 489 await self._wait_for_data('readuntil') 490 491 if isep > self._limit: 492 raise LimitOverrunError( 493 'Separator is found, but chunk is longer than limit', isep) 494 495 chunk = self._smart_buffer.get_data(isep + seplen) 496 self._maybe_resume_transport() 497 return bytes(chunk) 498 499 async def read(self, n=-1): 500 """Read up to `n` bytes from the stream. 501 502 If n is not provided, or set to -1, read until EOF and return all read 503 bytes. If the EOF was received and the internal buffer is empty, return 504 an empty bytes object. 505 506 If n is zero, return empty bytes object immediately. 507 508 If n is positive, this function try to read `n` bytes, and may return 509 less or equal bytes than requested, but at least one byte. If EOF was 510 received before any byte is read, this function returns empty byte 511 object. 512 513 Returned value is not limited with limit, configured at stream 514 creation. 515 516 If stream was paused, this function will automatically resume it if 517 needed. 518 """ 519 520 if self._exception is not None: 521 raise self._exception 522 523 if n == 0: 524 return b'' 525 526 if n < 0: 527 # This used to just loop creating a new waiter hoping to 528 # collect everything in self._buffer, but that would 529 # deadlock if the subprocess sends more than self.limit 530 # bytes. So just call self.read(self._limit) until EOF. 531 blocks = [] 532 while True: 533 block = await self.read_nearly(max(self._limit, self._smart_buffer.size())) 534 if not block: 535 break 536 blocks.append(block) 537 return b''.join(blocks) 538 539 if not self._smart_buffer.size() and not self._eof: 540 await self._wait_for_data('read') 541 542 # This will work right even if buffer is less than n bytes 543 data = self._smart_buffer.get_data(min(n, self._smart_buffer.size())) 544 545 self._maybe_resume_transport() 546 return data 547 548 async def read_nearly(self, n=-1): 549 """Read up to `n` bytes from the stream. 550 551 If n is not provided, or set to -1, read until EOF and return all read 552 bytes. If the EOF was received and the internal buffer is empty, return 553 an empty bytes object. 554 555 If n is zero, return empty bytes object immediately. 556 557 If n is positive, this function try to read `n` bytes, and may return 558 less or equal bytes than requested, but at least one byte. If EOF was 559 received before any byte is read, this function returns empty byte 560 object. 561 562 Returned value is not limited with limit, configured at stream 563 creation. 564 565 If stream was paused, this function will automatically resume it if 566 needed. 567 """ 568 569 if self._exception is not None: 570 raise self._exception 571 572 if n == 0: 573 return b'' 574 575 if n < 0: 576 # This used to just loop creating a new waiter hoping to 577 # collect everything in self._buffer, but that would 578 # deadlock if the subprocess sends more than self.limit 579 # bytes. So just call self.read(self._limit) until EOF. 580 blocks = [] 581 while True: 582 block = await self.read_nearly(max(self._limit, self._smart_buffer.size())) 583 if not block: 584 break 585 blocks.append(block) 586 return b''.join(blocks) 587 588 if not self._smart_buffer.size() and not self._eof: 589 await self._wait_for_data('read') 590 591 # This will work right even if buffer is less than n bytes 592 data = self._smart_buffer.get_data_nearly(n) 593 594 self._maybe_resume_transport() 595 return data 596 597 async def readexactly(self, n): 598 """Read exactly `n` bytes. 599 600 Raise an IncompleteReadError if EOF is reached before `n` bytes can be 601 read. The IncompleteReadError.partial attribute of the exception will 602 contain the partial read bytes. 603 604 if n is zero, return empty bytes object. 605 606 Returned value is not limited with limit, configured at stream 607 creation. 608 609 If stream was paused, this function will automatically resume it if 610 needed. 611 """ 612 if n < 0: 613 raise ValueError('readexactly size can not be less than zero') 614 615 if self._exception is not None: 616 raise self._exception 617 618 if n == 0: 619 return b'' 620 621 while self._smart_buffer.size() < n: 622 if self._eof: 623 incomplete = self._smart_buffer.get_data(self._smart_buffer.size()) 624 raise IncompleteReadError(incomplete, n) 625 626 await self._wait_for_data('readexactly') 627 628 if self._smart_buffer.size() == n: 629 data = self._smart_buffer.get_data(self._smart_buffer.size()) 630 else: 631 data = self._smart_buffer.get_data(n) 632 633 self._maybe_resume_transport() 634 return data 635 636 async def readonly_exactly(self, n): 637 if n < 0: 638 raise ValueError('readexactly size can not be less than zero') 639 640 if self._exception is not None: 641 raise self._exception 642 643 if n == 0: 644 return b'' 645 646 while self._smart_buffer.size() < n: 647 if self._eof: 648 incomplete = self._smart_buffer.read_data(self._smart_buffer.size()) 649 raise IncompleteReadError(incomplete, n) 650 651 await self._wait_for_data('readexactly') 652 653 if self._smart_buffer.size() == n: 654 data = self._smart_buffer.read_data(self._smart_buffer.size()) 655 else: 656 data = self._smart_buffer.read_data(n) 657 658 self._maybe_resume_transport() 659 return data 660 661 async def read_message(self): 662 message_len_encoded = await self.readexactly(self._message_protocol_settings._message_size_len) 663 message_len = int.from_bytes(message_len_encoded, 'little') 664 return await self.readexactly(message_len) 665 666 def message_awailable(self) -> bool: 667 message_size_len = self._message_protocol_settings._message_size_len 668 if self._smart_buffer.size() < message_size_len: 669 return False 670 671 message_len_encoded = self._smart_buffer.get_data(message_size_len) 672 message_len = int.from_bytes(message_len_encoded, 'little') 673 if self._smart_buffer.size() < (message_size_len + message_len): 674 return False 675 676 return True 677 678 def transport_pause_reading(self): 679 try: 680 self._transport.pause_reading() 681 except NotImplementedError: 682 # The transport can't be paused. 683 # We'll just have to buffer all data. 684 # Forget the transport so we don't keep trying. 685 pass 686 else: 687 self._paused = True 688 689 def transport_resume_reading(self): 690 self._paused = False 691 self._transport.resume_reading()
261 def __init__(self, manager: UdpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None: 262 self._stream_manager = manager 263 self._message_protocol_settings: MessageProtocolSettings = message_protocol_settings 264 super().__init__(*args, **kwargs) 265 self._smart_buffer: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.input_from_client_container_type( 266 external_data_length=self._stream_manager.io_memory_management.global_in__data_full_size) 267 self.recv_buff_size_computer = RecvBuffSizeComputer() 268 cpu_info_inst = cpu_info() 269 # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size 270 # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l2_cache_size_per_virtual_core 271 # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size_per_virtual_core 272 # self.recv_buff_size_computer.max_recv_buff_size = 3145728 273 self.recv_buff_size_computer.max_recv_buff_size = 10 * 1024**2 274 # self.recv_buff_size_computer.max_recv_buff_size = 1024 275 # print(f'max_recv_buff_size: {self.recv_buff_size_computer.max_recv_buff_size}') 276 self.limit_by_limit: bool = True 277 self.limit_by_global_in__data_size_limit: bool = True
285 async def read_with_counter(self): 286 if self._exception is not None: 287 raise self._exception 288 289 # This used to just loop creating a new waiter hoping to 290 # collect everything in self._buffer, but that would 291 # deadlock if the subprocess sends more than self.limit 292 # bytes. So just call self.read(self._limit) until EOF. 293 blocks = [] 294 counter = 0 295 while True: 296 block = await self.read_max() 297 counter += 1 298 if not block: 299 break 300 blocks.append(block) 301 return b''.join(blocks), counter
347 def at_eof(self): 348 """Return True if the buffer is empty and 'feed_eof' was called.""" 349 return self._eof and not self._smart_buffer.size()
Return True if the buffer is empty and 'feed_eof' was called.
351 def feed_data(self, data): 352 assert not self._eof, 'feed_data after feed_eof' 353 354 if not data: 355 return 356 357 data_len = len(data) 358 self.recv_buff_size_computer.calc_new_recv_buff_size(data_len) 359 self._smart_buffer.add_piece_of_data(data) 360 self._wakeup_waiter() 361 362 if (self._transport is not None and 363 not self._paused 364 and ( 365 (self.limit_by_limit and (self._smart_buffer.size() > 2 * self._limit) 366 or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value > self._stream_manager.io_memory_management.global_in__data_size_limit.value))) 367 )): 368 try: 369 self._transport.pause_reading() 370 except NotImplementedError: 371 # The transport can't be paused. 372 # We'll just have to buffer all data. 373 # Forget the transport so we don't keep trying. 374 self._transport = None 375 else: 376 self._paused = True
378 async def readline(self): 379 """Read chunk of data from the stream until newline (b'\n') is found. 380 381 On success, return chunk that ends with newline. If only partial 382 line can be read due to EOF, return incomplete line without 383 terminating newline. When EOF was reached while no bytes read, empty 384 bytes object is returned. 385 386 If limit is reached, ValueError will be raised. In that case, if 387 newline was found, complete line including newline will be removed 388 from internal buffer. Else, internal buffer will be cleared. Limit is 389 compared against part of the line without newline. 390 391 If stream was paused, this function will automatically resume it if 392 needed. 393 """ 394 sep = b'\n' 395 seplen = len(sep) 396 try: 397 line = await self.readuntil(sep) 398 except IncompleteReadError as e: 399 return e.partial 400 except LimitOverrunError as e: 401 if self._smart_buffer.startswith(sep, e.consumed): 402 self._smart_buffer.get_data(e.consumed + seplen) 403 else: 404 self._smart_buffer.clear() 405 406 self._maybe_resume_transport() 407 raise ValueError(e.args[0]) 408 return line
Read chunk of data from the stream until newline (b' ') is found.
On success, return chunk that ends with newline. If only partial
line can be read due to EOF, return incomplete line without
terminating newline. When EOF was reached while no bytes read, empty
bytes object is returned.
If limit is reached, ValueError will be raised. In that case, if
newline was found, complete line including newline will be removed
from internal buffer. Else, internal buffer will be cleared. Limit is
compared against part of the line without newline.
If stream was paused, this function will automatically resume it if
needed.
410 async def readuntil(self, separator=b'\n'): 411 """Read data from the stream until ``separator`` is found. 412 413 On success, the data and separator will be removed from the 414 internal buffer (consumed). Returned data will include the 415 separator at the end. 416 417 Configured stream limit is used to check result. Limit sets the 418 maximal length of data that can be returned, not counting the 419 separator. 420 421 If an EOF occurs and the complete separator is still not found, 422 an IncompleteReadError exception will be raised, and the internal 423 buffer will be reset. The IncompleteReadError.partial attribute 424 may contain the separator partially. 425 426 If the data cannot be read because of over limit, a 427 LimitOverrunError exception will be raised, and the data 428 will be left in the internal buffer, so it can be read again. 429 """ 430 seplen = len(separator) 431 if seplen == 0: 432 raise ValueError('Separator should be at least one-byte string') 433 434 if self._exception is not None: 435 raise self._exception 436 437 # Consume whole buffer except last bytes, which length is 438 # one less than seplen. Let's check corner cases with 439 # separator='SEPARATOR': 440 # * we have received almost complete separator (without last 441 # byte). i.e buffer='some textSEPARATO'. In this case we 442 # can safely consume len(separator) - 1 bytes. 443 # * last byte of buffer is first byte of separator, i.e. 444 # buffer='abcdefghijklmnopqrS'. We may safely consume 445 # everything except that last byte, but this require to 446 # analyze bytes of buffer that match partial separator. 447 # This is slow and/or require FSM. For this case our 448 # implementation is not optimal, since require rescanning 449 # of data that is known to not belong to separator. In 450 # real world, separator will not be so long to notice 451 # performance problems. Even when reading MIME-encoded 452 # messages :) 453 454 # `offset` is the number of bytes from the beginning of the buffer 455 # where there is no occurrence of `separator`. 456 offset = 0 457 458 # Loop until we find `separator` in the buffer, exceed the buffer size, 459 # or an EOF has happened. 460 while True: 461 buflen = self._smart_buffer.size() 462 463 # Check if we now have enough data in the buffer for `separator` to 464 # fit. 465 if buflen - offset >= seplen: 466 isep = self._smart_buffer.find(separator, offset) 467 468 if isep != -1: 469 # `separator` is in the buffer. `isep` will be used later 470 # to retrieve the data. 471 break 472 473 # see upper comment for explanation. 474 offset = buflen + 1 - seplen 475 if offset > self._limit: 476 raise LimitOverrunError( 477 'Separator is not found, and chunk exceed the limit', 478 offset) 479 480 # Complete message (with full separator) may be present in buffer 481 # even when EOF flag is set. This may happen when the last chunk 482 # adds data which makes separator be found. That's why we check for 483 # EOF *ater* inspecting the buffer. 484 if self._eof: 485 chunk = self._smart_buffer.get_data(self._smart_buffer.size()) 486 raise IncompleteReadError(chunk, None) 487 488 # _wait_for_data() will resume reading if stream was paused. 489 await self._wait_for_data('readuntil') 490 491 if isep > self._limit: 492 raise LimitOverrunError( 493 'Separator is found, but chunk is longer than limit', isep) 494 495 chunk = self._smart_buffer.get_data(isep + seplen) 496 self._maybe_resume_transport() 497 return bytes(chunk)
Read data from the stream until separator
is found.
On success, the data and separator will be removed from the internal buffer (consumed). Returned data will include the separator at the end.
Configured stream limit is used to check result. Limit sets the maximal length of data that can be returned, not counting the separator.
If an EOF occurs and the complete separator is still not found, an IncompleteReadError exception will be raised, and the internal buffer will be reset. The IncompleteReadError.partial attribute may contain the separator partially.
If the data cannot be read because of over limit, a LimitOverrunError exception will be raised, and the data will be left in the internal buffer, so it can be read again.
499 async def read(self, n=-1): 500 """Read up to `n` bytes from the stream. 501 502 If n is not provided, or set to -1, read until EOF and return all read 503 bytes. If the EOF was received and the internal buffer is empty, return 504 an empty bytes object. 505 506 If n is zero, return empty bytes object immediately. 507 508 If n is positive, this function try to read `n` bytes, and may return 509 less or equal bytes than requested, but at least one byte. If EOF was 510 received before any byte is read, this function returns empty byte 511 object. 512 513 Returned value is not limited with limit, configured at stream 514 creation. 515 516 If stream was paused, this function will automatically resume it if 517 needed. 518 """ 519 520 if self._exception is not None: 521 raise self._exception 522 523 if n == 0: 524 return b'' 525 526 if n < 0: 527 # This used to just loop creating a new waiter hoping to 528 # collect everything in self._buffer, but that would 529 # deadlock if the subprocess sends more than self.limit 530 # bytes. So just call self.read(self._limit) until EOF. 531 blocks = [] 532 while True: 533 block = await self.read_nearly(max(self._limit, self._smart_buffer.size())) 534 if not block: 535 break 536 blocks.append(block) 537 return b''.join(blocks) 538 539 if not self._smart_buffer.size() and not self._eof: 540 await self._wait_for_data('read') 541 542 # This will work right even if buffer is less than n bytes 543 data = self._smart_buffer.get_data(min(n, self._smart_buffer.size())) 544 545 self._maybe_resume_transport() 546 return data
Read up to n
bytes from the stream.
If n is not provided, or set to -1, read until EOF and return all read bytes. If the EOF was received and the internal buffer is empty, return an empty bytes object.
If n is zero, return empty bytes object immediately.
If n is positive, this function try to read n
bytes, and may return
less or equal bytes than requested, but at least one byte. If EOF was
received before any byte is read, this function returns empty byte
object.
Returned value is not limited with limit, configured at stream creation.
If stream was paused, this function will automatically resume it if needed.
548 async def read_nearly(self, n=-1): 549 """Read up to `n` bytes from the stream. 550 551 If n is not provided, or set to -1, read until EOF and return all read 552 bytes. If the EOF was received and the internal buffer is empty, return 553 an empty bytes object. 554 555 If n is zero, return empty bytes object immediately. 556 557 If n is positive, this function try to read `n` bytes, and may return 558 less or equal bytes than requested, but at least one byte. If EOF was 559 received before any byte is read, this function returns empty byte 560 object. 561 562 Returned value is not limited with limit, configured at stream 563 creation. 564 565 If stream was paused, this function will automatically resume it if 566 needed. 567 """ 568 569 if self._exception is not None: 570 raise self._exception 571 572 if n == 0: 573 return b'' 574 575 if n < 0: 576 # This used to just loop creating a new waiter hoping to 577 # collect everything in self._buffer, but that would 578 # deadlock if the subprocess sends more than self.limit 579 # bytes. So just call self.read(self._limit) until EOF. 580 blocks = [] 581 while True: 582 block = await self.read_nearly(max(self._limit, self._smart_buffer.size())) 583 if not block: 584 break 585 blocks.append(block) 586 return b''.join(blocks) 587 588 if not self._smart_buffer.size() and not self._eof: 589 await self._wait_for_data('read') 590 591 # This will work right even if buffer is less than n bytes 592 data = self._smart_buffer.get_data_nearly(n) 593 594 self._maybe_resume_transport() 595 return data
Read up to n
bytes from the stream.
If n is not provided, or set to -1, read until EOF and return all read bytes. If the EOF was received and the internal buffer is empty, return an empty bytes object.
If n is zero, return empty bytes object immediately.
If n is positive, this function try to read n
bytes, and may return
less or equal bytes than requested, but at least one byte. If EOF was
received before any byte is read, this function returns empty byte
object.
Returned value is not limited with limit, configured at stream creation.
If stream was paused, this function will automatically resume it if needed.
597 async def readexactly(self, n): 598 """Read exactly `n` bytes. 599 600 Raise an IncompleteReadError if EOF is reached before `n` bytes can be 601 read. The IncompleteReadError.partial attribute of the exception will 602 contain the partial read bytes. 603 604 if n is zero, return empty bytes object. 605 606 Returned value is not limited with limit, configured at stream 607 creation. 608 609 If stream was paused, this function will automatically resume it if 610 needed. 611 """ 612 if n < 0: 613 raise ValueError('readexactly size can not be less than zero') 614 615 if self._exception is not None: 616 raise self._exception 617 618 if n == 0: 619 return b'' 620 621 while self._smart_buffer.size() < n: 622 if self._eof: 623 incomplete = self._smart_buffer.get_data(self._smart_buffer.size()) 624 raise IncompleteReadError(incomplete, n) 625 626 await self._wait_for_data('readexactly') 627 628 if self._smart_buffer.size() == n: 629 data = self._smart_buffer.get_data(self._smart_buffer.size()) 630 else: 631 data = self._smart_buffer.get_data(n) 632 633 self._maybe_resume_transport() 634 return data
Read exactly n
bytes.
Raise an IncompleteReadError if EOF is reached before n
bytes can be
read. The IncompleteReadError.partial attribute of the exception will
contain the partial read bytes.
if n is zero, return empty bytes object.
Returned value is not limited with limit, configured at stream creation.
If stream was paused, this function will automatically resume it if needed.
636 async def readonly_exactly(self, n): 637 if n < 0: 638 raise ValueError('readexactly size can not be less than zero') 639 640 if self._exception is not None: 641 raise self._exception 642 643 if n == 0: 644 return b'' 645 646 while self._smart_buffer.size() < n: 647 if self._eof: 648 incomplete = self._smart_buffer.read_data(self._smart_buffer.size()) 649 raise IncompleteReadError(incomplete, n) 650 651 await self._wait_for_data('readexactly') 652 653 if self._smart_buffer.size() == n: 654 data = self._smart_buffer.read_data(self._smart_buffer.size()) 655 else: 656 data = self._smart_buffer.read_data(n) 657 658 self._maybe_resume_transport() 659 return data
666 def message_awailable(self) -> bool: 667 message_size_len = self._message_protocol_settings._message_size_len 668 if self._smart_buffer.size() < message_size_len: 669 return False 670 671 message_len_encoded = self._smart_buffer.get_data(message_size_len) 672 message_len = int.from_bytes(message_len_encoded, 'little') 673 if self._smart_buffer.size() < (message_size_len + message_len): 674 return False 675 676 return True
Inherited Members
- asyncio.streams.StreamReader
- exception
- set_exception
- set_transport
- feed_eof
694class UdpStreamReaderProtocol(OriginalStreamReaderProtocol, StreamReaderProtocolAbstract): 695 def __init__(self, manager: UdpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None: 696 self._stream_manager = manager 697 self._message_protocol_settings: MessageProtocolSettings = message_protocol_settings 698 super().__init__(*args, **kwargs) 699 700 def connection_made(self, transport): 701 if self._reject_connection: 702 context = { 703 'message': ('An open stream was garbage collected prior to ' 704 'establishing network connection; ' 705 'call "stream.close()" explicitly.') 706 } 707 if self._source_traceback: 708 context['source_traceback'] = self._source_traceback 709 self._loop.call_exception_handler(context) 710 transport.abort() 711 return 712 713 self._transport = transport 714 reader = self._stream_reader 715 if reader is not None: 716 reader.set_transport(transport) 717 718 self._over_ssl = transport.get_extra_info('sslcontext') is not None 719 if self._client_connected_cb is not None: 720 self._stream_writer = UdpStreamWriter(transport, self, 721 reader, 722 self._loop) 723 res = self._client_connected_cb(reader, 724 self._stream_writer) 725 if coroutines.iscoroutine(res): 726 self._loop.create_task(res) 727 728 self._strong_reader = None
Helper class to adapt between Protocol and StreamReader.
(This is a helper class instead of making StreamReader itself a Protocol subclass, because the StreamReader has other potential uses, and to prevent the user of the StreamReader to accidentally call inappropriate methods of the protocol.)
700 def connection_made(self, transport): 701 if self._reject_connection: 702 context = { 703 'message': ('An open stream was garbage collected prior to ' 704 'establishing network connection; ' 705 'call "stream.close()" explicitly.') 706 } 707 if self._source_traceback: 708 context['source_traceback'] = self._source_traceback 709 self._loop.call_exception_handler(context) 710 transport.abort() 711 return 712 713 self._transport = transport 714 reader = self._stream_reader 715 if reader is not None: 716 reader.set_transport(transport) 717 718 self._over_ssl = transport.get_extra_info('sslcontext') is not None 719 if self._client_connected_cb is not None: 720 self._stream_writer = UdpStreamWriter(transport, self, 721 reader, 722 self._loop) 723 res = self._client_connected_cb(reader, 724 self._stream_writer) 725 if coroutines.iscoroutine(res): 726 self._loop.create_task(res) 727 728 self._strong_reader = None
Called when a connection is made.
The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.
Inherited Members
- asyncio.streams.StreamReaderProtocol
- connection_lost
- data_received
- eof_received
- asyncio.streams.FlowControlMixin
- pause_writing
- resume_writing
731class UdpStreamWriter(OriginalStreamWriter, StreamWriterAbstract): 732 def __init__(self, *args, **kwargs) -> None: 733 super().__init__(*args, **kwargs) 734 self._stream_manager: UdpStreamManager = self._protocol._stream_manager 735 self._output_to_client: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.output_to_client_container_type( 736 external_data_length=self._stream_manager.io_memory_management.global_out__data_full_size) 737 self.socket_write_fixed_buffer_size: ValueExistence = self._stream_manager.io_memory_management.socket_write_fixed_buffer_size 738 self._autonomous_writer_future: Task = None 739 self._autonomous_writer_future_stop_requessted: bool = False 740 self._autonomous_writer_drain_enough_futures: List[Future] = list() 741 742 def optimized_write(self, data): 743 self._output_to_client.add_piece_of_data(data) 744 # self.write(data) 745 746 def owrite(self, data): 747 return self.optimized_write(data) 748 749 async def partial_drain(self): 750 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 751 while another_piece_of_data: 752 self.write(another_piece_of_data) 753 await self.drain() 754 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 755 756 async def pdrain(self): 757 return await self.partial_drain() 758 759 async def full_drain(self): 760 await self.pdrain() 761 rest_of_the_data_size = self._output_to_client.size() 762 if rest_of_the_data_size: 763 another_piece_of_data = self._output_to_client.get_data(rest_of_the_data_size) 764 self.write(another_piece_of_data) 765 await self.drain() 766 767 async def fdrain(self): 768 return await self.full_drain() 769 770 def _release_autonomous_writer_waiters(self): 771 current_size = self._output_to_client.size() 772 autonomous_writer_drain_enough_futures_buff = self._autonomous_writer_drain_enough_futures 773 self._autonomous_writer_drain_enough_futures = type(autonomous_writer_drain_enough_futures_buff)() 774 for item in autonomous_writer_drain_enough_futures_buff: 775 lower_water_size, future = item 776 if current_size < lower_water_size: 777 if (not future.done()) and (not future.cancelled()): 778 future.set_result(None) 779 780 if (not future.done()) and (not future.cancelled()): 781 self._autonomous_writer_drain_enough_futures.append(item) 782 783 async def _autonomous_writer_impl(self): 784 ty = TimedYield(0) 785 while not self._autonomous_writer_future_stop_requessted: 786 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 787 self._release_autonomous_writer_waiters() 788 while another_piece_of_data: 789 self.write(another_piece_of_data) 790 await self.drain() 791 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 792 self._release_autonomous_writer_waiters() 793 794 await ty() 795 796 def start_autonomous_writer(self): 797 self._autonomous_writer_future = create_task(self._autonomous_writer_impl) 798 799 def start_aw(self): 800 return self.start_autonomous_writer() 801 802 async def stop_autonomous_writer(self, timeout: Optional[Union[int, float]] = 0): 803 """_summary_ 804 805 Args: 806 timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout 807 808 Returns: 809 _type_: _description_ 810 """ 811 result = None 812 if timeout is None: 813 timeout = self._stream_manager.autonomous_writer_stop_default_timeout 814 815 if self._autonomous_writer_future and (not self._autonomous_writer_future_stop_requessted): 816 self._autonomous_writer_future_stop_requessted = True 817 if timeout: 818 result = await asyncio.wait_for(self._autonomous_writer_future, timeout) 819 else: 820 result = await self._autonomous_writer_future 821 822 self._autonomous_writer_future = None 823 self._autonomous_writer_future_stop_requessted = False 824 825 return result 826 827 async def stop_aw(self, timeout: Optional[Union[int, float]] = 0): 828 """_summary_ 829 830 Args: 831 timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout 832 833 Returns: 834 _type_: _description_ 835 """ 836 return await self.stop_autonomous_writer(timeout) 837 838 async def autonomous_writer_drain_enough(self, lower_water_size: Optional[int] = None): 839 if lower_water_size is None: 840 lower_water_size = self.socket_write_fixed_buffer_size.value * 3 841 # print(f'lower_water_size: {lower_water_size}') 842 # lower_water_size = cpu_info().l3_cache_size 843 844 if lower_water_size <= self._output_to_client.size(): 845 future: Future = self._loop.create_future() 846 self._autonomous_writer_drain_enough_futures.append((lower_water_size, future)) 847 await future 848 849 async def aw_drain_enough(self): 850 await self.autonomous_writer_drain_enough() 851 852 def optimized_write_message(self, data): 853 self.optimized_write(len(data).to_bytes(self._protocol._message_protocol_settings._message_size_len, 'little') + data) 854 855 def owrite_message(self, data): 856 self.optimized_write_message(data) 857 858 async def send_message(self, data): 859 self.optimized_write_message(data) 860 await self.fdrain()
Wraps a Transport.
This exposes write(), writelines(), [can_]write_eof(), get_extra_info() and close(). It adds drain() which returns an optional Future on which you can wait for flow control. It also adds a transport property which references the Transport directly.
732 def __init__(self, *args, **kwargs) -> None: 733 super().__init__(*args, **kwargs) 734 self._stream_manager: UdpStreamManager = self._protocol._stream_manager 735 self._output_to_client: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.output_to_client_container_type( 736 external_data_length=self._stream_manager.io_memory_management.global_out__data_full_size) 737 self.socket_write_fixed_buffer_size: ValueExistence = self._stream_manager.io_memory_management.socket_write_fixed_buffer_size 738 self._autonomous_writer_future: Task = None 739 self._autonomous_writer_future_stop_requessted: bool = False 740 self._autonomous_writer_drain_enough_futures: List[Future] = list()
749 async def partial_drain(self): 750 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value) 751 while another_piece_of_data: 752 self.write(another_piece_of_data) 753 await self.drain() 754 another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
802 async def stop_autonomous_writer(self, timeout: Optional[Union[int, float]] = 0): 803 """_summary_ 804 805 Args: 806 timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout 807 808 Returns: 809 _type_: _description_ 810 """ 811 result = None 812 if timeout is None: 813 timeout = self._stream_manager.autonomous_writer_stop_default_timeout 814 815 if self._autonomous_writer_future and (not self._autonomous_writer_future_stop_requessted): 816 self._autonomous_writer_future_stop_requessted = True 817 if timeout: 818 result = await asyncio.wait_for(self._autonomous_writer_future, timeout) 819 else: 820 result = await self._autonomous_writer_future 821 822 self._autonomous_writer_future = None 823 self._autonomous_writer_future_stop_requessted = False 824 825 return result
_summary_
Args: timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout
Returns: _type_: _description_
827 async def stop_aw(self, timeout: Optional[Union[int, float]] = 0): 828 """_summary_ 829 830 Args: 831 timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout 832 833 Returns: 834 _type_: _description_ 835 """ 836 return await self.stop_autonomous_writer(timeout)
_summary_
Args: timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout
Returns: _type_: _description_
838 async def autonomous_writer_drain_enough(self, lower_water_size: Optional[int] = None): 839 if lower_water_size is None: 840 lower_water_size = self.socket_write_fixed_buffer_size.value * 3 841 # print(f'lower_water_size: {lower_water_size}') 842 # lower_water_size = cpu_info().l3_cache_size 843 844 if lower_water_size <= self._output_to_client.size(): 845 future: Future = self._loop.create_future() 846 self._autonomous_writer_drain_enough_futures.append((lower_water_size, future)) 847 await future
Inherited Members
- asyncio.streams.StreamWriter
- transport
- write
- writelines
- write_eof
- can_write_eof
- close
- is_closing
- wait_closed
- get_extra_info
- drain