cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.udp_efficient_streams

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19"""
 20Module Docstring
 21Docstrings: http://www.python.org/dev/peps/pep-0257/
 22"""
 23
 24
 25__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 28__license__ = "Apache License, Version 2.0"
 29__version__ = "4.4.1"
 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 31__email__ = "gtalk@butenkoms.space"
 32# __status__ = "Prototype"
 33__status__ = "Development"
 34# __status__ = "Production"
 35
 36
 37__all__ = ['StreamType', 'GateSecurityPolicy', 'StreamManagerIOCoreMemoryManagement', 'UdpStreamManager', 'UdpStreamReader', 'UdpStreamReaderProtocol', 'UdpStreamWriter']
 38
 39
 40import warnings
 41import asyncio
 42from asyncio.exceptions import IncompleteReadError, LimitOverrunError
 43from asyncio import streams
 44from asyncio.streams import StreamWriter as OriginalStreamWriter
 45from asyncio.streams import StreamReader as OriginalStreamReader
 46from asyncio.streams import StreamReaderProtocol as OriginalStreamReaderProtocol
 47from asyncio import events
 48from asyncio import proactor_events
 49from asyncio import selector_events
 50try:
 51    from asyncio import unix_events
 52except ImportError:
 53    pass
 54
 55from asyncio import coroutines
 56from asyncio.tasks import sleep, Task
 57from asyncio.futures import Future
 58from copy import copy
 59from enum import Enum
 60from cengal.io.asock_io.versions.v_1.recv_buff_size_computer.recv_buff_size_computer__python import RecvBuffSizeComputer
 61# from cengal.io.asock_io.versions.v_1.base import IOCoreMemoryManagement
 62from cengal.parallel_execution.asyncio.atasks import create_task
 63from cengal.parallel_execution.asyncio.timed_yield import TimedYield
 64from cengal.hardware.info.cpu import cpu_info
 65# from cengal.data_containers.dynamic_list_of_pieces import DynamicListOfPiecesDequeWithLengthControl
 66# from cengal.data_containers.fast_fifo import FIFODequeWithLengthControl, FIFOIsEmpty
 67# from cengal.data_manipulation.front_triggerable_variable import FrontTriggerableVariable, FrontTriggerableVariableType
 68from cengal.data_containers.dynamic_list_of_pieces.versions.v_1.dynamic_list_of_pieces__python import DynamicListOfPiecesDequeWithLengthControl
 69from cengal.code_flow_control.smart_values.versions import ValueExistence
 70from typing import Sequence, Tuple, Type, Set, Optional, Union, List
 71from .efficient_streams_base_internal import *
 72from .efficient_streams_base import *
 73from .efficient_streams_abstract import *
 74
 75
 76class UdpStreamManager(StreamManagerAbstract):
 77    def __init__(self) -> None:
 78        self.io_memory_management: StreamManagerIOCoreMemoryManagement = StreamManagerIOCoreMemoryManagement()
 79        self.autonomous_writer_stop_default_timeout: Optional[Union[int, float]] = 10.0
 80        self.output_to_client_container_type = DynamicListOfPiecesDequeWithLengthControl
 81        self.input_from_client_container_type = DynamicListOfPiecesDequeWithLengthControl
 82
 83    async def open_connection(self, host=None, port=None, *,
 84                            loop=None, limit=DEFAULT_LIMIT,
 85                            stream_type: StreamType = StreamType.general, stream_name: str = str(),
 86                            protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None,
 87                            max_message_size_len: Optional[int] = None,
 88                            **kwds):
 89        """A wrapper for create_connection() returning a (reader, writer) pair.
 90
 91        The reader returned is a UdpStreamReader instance; the writer is a
 92        UdpStreamWriter instance.
 93
 94        The arguments are all the usual arguments to create_connection()
 95        except protocol_factory; most common are positional host and port,
 96        with various optional keyword arguments following.
 97
 98        Additional optional keyword arguments are loop (to set the event loop
 99        instance to use) and limit (to set the buffer limit passed to the
100        UdpStreamReader).
101
102        (If you want to customize the UdpStreamReader and/or
103        UdpStreamReaderProtocol classes, just copy the code -- there's
104        really nothing special here except some convenience.)
105        """
106        if StreamType.gate == stream_type:
107            raise ValueError(f'Wrong stream_type value: client can not be a "gate".')
108        
109        if loop is None:
110            loop = events.get_event_loop()
111        else:
112            warnings.warn("The loop argument is deprecated since Python 3.8, "
113                        "and scheduled for removal in Python 3.10.",
114                        DeprecationWarning, stacklevel=2)
115        
116        message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len)
117        reader = UdpStreamReader(self, message_protocol_settings, limit=limit, loop=loop)
118        protocol = UdpStreamReaderProtocol(self, message_protocol_settings, reader, loop=loop)
119        transport, _ = await loop.create_connection(
120            lambda: protocol, host, port, **kwds)
121        writer = UdpStreamWriter(transport, protocol, reader, loop)
122        return reader, writer
123
124    async def start_server(self, client_connected_cb, host=None, port=None, *,
125                        loop=None, limit=DEFAULT_LIMIT,
126                        stream_type: StreamType = StreamType.general, stream_name: str = str(),
127                        gate_security_policy: GateSecurityPolicy = GateSecurityPolicy.disabled, policy_managed_stream_names: Optional[Set[str]] = None,
128                        protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None,
129                        max_message_size_len: Optional[int] = None,
130                        **kwds):
131        """Start a socket server, call back for each client connected.
132
133        The first parameter, `client_connected_cb`, takes two parameters:
134        client_reader, client_writer.  client_reader is a UdpStreamReader
135        object, while client_writer is a UdpStreamWriter object.  This
136        parameter can either be a plain callback function or a coroutine;
137        if it is a coroutine, it will be automatically converted into a
138        Task.
139
140        The rest of the arguments are all the usual arguments to
141        loop.create_server() except protocol_factory; most common are
142        positional host and port, with various optional keyword arguments
143        following.  The return value is the same as loop.create_server().
144
145        Additional optional keyword arguments are loop (to set the event loop
146        instance to use) and limit (to set the buffer limit passed to the
147        UdpStreamReader).
148
149        The return value is the same as loop.create_server(), i.e. a
150        Server object which can be used to stop the service.
151        """
152        if loop is None:
153            loop = events.get_event_loop()
154        else:
155            warnings.warn("The loop argument is deprecated since Python 3.8, "
156                        "and scheduled for removal in Python 3.10.",
157                        DeprecationWarning, stacklevel=2)
158
159        def factory():
160            message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len)
161            reader =  UdpStreamReader(self, message_protocol_settings, limit=limit, loop=loop)
162            protocol = UdpStreamReaderProtocol(self, message_protocol_settings, reader, client_connected_cb,
163                                            loop=loop)
164            return protocol
165
166        return await loop.create_server(factory, host, port, **kwds)
167    
168    async def try_establish_message_protocol_server_side(self, reader: 'UdpStreamReader', writer: 'UdpStreamWriter') -> bool:
169        return True
170    
171    async def try_establish_message_protocol_client_side(self, reader: 'UdpStreamReader', writer: 'UdpStreamWriter') -> bool:
172        return True
173
174
175class Server(events.AbstractServer):
176
177    def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
178                 ssl_handshake_timeout):
179        self._loop = loop
180        self._sockets = sockets
181        self._active_count = 0
182        self._waiters = []
183        self._protocol_factory = protocol_factory
184        self._backlog = backlog
185        self._ssl_context = ssl_context
186        self._ssl_handshake_timeout = ssl_handshake_timeout
187        self._serving = False
188        self._serving_forever_fut = None
189
190    def __repr__(self):
191        return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
192    
193    def get_loop(self):
194        return self._loop
195
196    def is_serving(self):
197        return self._serving
198
199    @property
200    def sockets(self):
201        if self._sockets is None:
202            return ()
203        return tuple(trsock.TransportSocket(s) for s in self._sockets)
204
205    def close(self):
206        sockets = self._sockets
207        if sockets is None:
208            return
209        self._sockets = None
210
211        for sock in sockets:
212            self._loop._stop_serving(sock)
213
214        self._serving = False
215
216        if (self._serving_forever_fut is not None and
217                not self._serving_forever_fut.done()):
218            self._serving_forever_fut.cancel()
219            self._serving_forever_fut = None
220
221        if self._active_count == 0:
222            self._wakeup()
223
224    async def start_serving(self):
225        self._start_serving()
226        # Skip one loop iteration so that all 'loop.add_reader'
227        # go through.
228        await tasks.sleep(0, loop=self._loop)
229
230    async def serve_forever(self):
231        if self._serving_forever_fut is not None:
232            raise RuntimeError(
233                f'server {self!r} is already being awaited on serve_forever()')
234        if self._sockets is None:
235            raise RuntimeError(f'server {self!r} is closed')
236
237        self._start_serving()
238        self._serving_forever_fut = self._loop.create_future()
239
240        try:
241            await self._serving_forever_fut
242        except exceptions.CancelledError:
243            try:
244                self.close()
245                await self.wait_closed()
246            finally:
247                raise
248        finally:
249            self._serving_forever_fut = None
250
251    async def wait_closed(self):
252        if self._sockets is None or self._waiters is None:
253            return
254        waiter = self._loop.create_future()
255        self._waiters.append(waiter)
256        await waiter
257
258
259class UdpStreamReader(OriginalStreamReader, StreamReaderAbstract):
260    def __init__(self, manager: UdpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None:
261        self._stream_manager = manager
262        self._message_protocol_settings: MessageProtocolSettings = message_protocol_settings
263        super().__init__(*args, **kwargs)
264        self._smart_buffer: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.input_from_client_container_type(
265            external_data_length=self._stream_manager.io_memory_management.global_in__data_full_size)
266        self.recv_buff_size_computer = RecvBuffSizeComputer()
267        cpu_info_inst = cpu_info()
268        # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size
269        # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l2_cache_size_per_virtual_core
270        # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size_per_virtual_core
271        # self.recv_buff_size_computer.max_recv_buff_size = 3145728
272        self.recv_buff_size_computer.max_recv_buff_size = 10 * 1024**2
273        # self.recv_buff_size_computer.max_recv_buff_size = 1024
274        # print(f'max_recv_buff_size: {self.recv_buff_size_computer.max_recv_buff_size}')
275        self.limit_by_limit: bool = True
276        self.limit_by_global_in__data_size_limit: bool = True
277    
278    async def read_max(self):
279        return await self.read(self._limit)
280    
281    async def read_nearly_max(self):
282        return await self.read_nearly(self._limit)
283    
284    async def read_with_counter(self):
285        if self._exception is not None:
286            raise self._exception
287
288        # This used to just loop creating a new waiter hoping to
289        # collect everything in self._buffer, but that would
290        # deadlock if the subprocess sends more than self.limit
291        # bytes.  So just call self.read(self._limit) until EOF.
292        blocks = []
293        counter = 0
294        while True:
295            block = await self.read_max()
296            counter += 1
297            if not block:
298                break
299            blocks.append(block)
300        return b''.join(blocks), counter
301
302    def __repr__(self):
303        info = ['UdpStreamReader']
304        if self._smart_buffer.size():
305            info.append(f'{self._smart_buffer.size()} bytes')
306        if self._eof:
307            info.append('eof')
308        if self._limit != DEFAULT_LIMIT:
309            info.append(f'limit={self._limit}')
310        if self._waiter:
311            info.append(f'waiter={self._waiter!r}')
312        if self._exception:
313            info.append(f'exception={self._exception!r}')
314        if self._transport:
315            info.append(f'transport={self._transport!r}')
316        if self._paused:
317            info.append('paused')
318        return '<{}>'.format(' '.join(info))
319
320    def _maybe_resume_transport(self):
321        if isinstance(self._transport, (
322            proactor_events._ProactorDatagramTransport,
323            selector_events._SelectorTransport,
324            unix_events._UnixReadPipeTransport
325            )):
326            # if hasattr(self._transport, 'max_size'):
327            try:
328                self._transport.max_size = self.recv_buff_size_computer.recv_buff_size
329                # print(f'max_size: {self._transport.max_size}')
330            except AttributeError:
331                pass
332        else:
333            print(f'Unsupported transport: {type(self._transport)}')
334        
335        if self._paused \
336            and (
337                ((not self.limit_by_limit) and (not self.limit_by_global_in__data_size_limit)) \
338                or (self.limit_by_limit and (not self._limit)) \
339                or (self.limit_by_limit and (self._smart_buffer.size() <= self._limit)) \
340                or (self.limit_by_global_in__data_size_limit and (not self._stream_manager.io_memory_management.global_in__data_size_limit)) \
341                or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value <= self._stream_manager.io_memory_management.global_in__data_size_limit.value))
342            ):
343            self._paused = False
344            self._transport.resume_reading()
345
346    def at_eof(self):
347        """Return True if the buffer is empty and 'feed_eof' was called."""
348        return self._eof and not self._smart_buffer.size()
349
350    def feed_data(self, data):
351        assert not self._eof, 'feed_data after feed_eof'
352
353        if not data:
354            return
355
356        data_len = len(data)
357        self.recv_buff_size_computer.calc_new_recv_buff_size(data_len)
358        self._smart_buffer.add_piece_of_data(data)
359        self._wakeup_waiter()
360
361        if (self._transport is not None and
362                not self._paused 
363                and (
364                    (self.limit_by_limit and (self._smart_buffer.size() > 2 * self._limit)
365                    or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value > self._stream_manager.io_memory_management.global_in__data_size_limit.value)))
366                )):
367            try:
368                self._transport.pause_reading()
369            except NotImplementedError:
370                # The transport can't be paused.
371                # We'll just have to buffer all data.
372                # Forget the transport so we don't keep trying.
373                self._transport = None
374            else:
375                self._paused = True
376
377    async def readline(self):
378        """Read chunk of data from the stream until newline (b'\n') is found.
379
380        On success, return chunk that ends with newline. If only partial
381        line can be read due to EOF, return incomplete line without
382        terminating newline. When EOF was reached while no bytes read, empty
383        bytes object is returned.
384
385        If limit is reached, ValueError will be raised. In that case, if
386        newline was found, complete line including newline will be removed
387        from internal buffer. Else, internal buffer will be cleared. Limit is
388        compared against part of the line without newline.
389
390        If stream was paused, this function will automatically resume it if
391        needed.
392        """
393        sep = b'\n'
394        seplen = len(sep)
395        try:
396            line = await self.readuntil(sep)
397        except IncompleteReadError as e:
398            return e.partial
399        except LimitOverrunError as e:
400            if self._smart_buffer.startswith(sep, e.consumed):
401                self._smart_buffer.get_data(e.consumed + seplen)
402            else:
403                self._smart_buffer.clear()
404            
405            self._maybe_resume_transport()
406            raise ValueError(e.args[0])
407        return line
408
409    async def readuntil(self, separator=b'\n'):
410        """Read data from the stream until ``separator`` is found.
411
412        On success, the data and separator will be removed from the
413        internal buffer (consumed). Returned data will include the
414        separator at the end.
415
416        Configured stream limit is used to check result. Limit sets the
417        maximal length of data that can be returned, not counting the
418        separator.
419
420        If an EOF occurs and the complete separator is still not found,
421        an IncompleteReadError exception will be raised, and the internal
422        buffer will be reset.  The IncompleteReadError.partial attribute
423        may contain the separator partially.
424
425        If the data cannot be read because of over limit, a
426        LimitOverrunError exception  will be raised, and the data
427        will be left in the internal buffer, so it can be read again.
428        """
429        seplen = len(separator)
430        if seplen == 0:
431            raise ValueError('Separator should be at least one-byte string')
432
433        if self._exception is not None:
434            raise self._exception
435
436        # Consume whole buffer except last bytes, which length is
437        # one less than seplen. Let's check corner cases with
438        # separator='SEPARATOR':
439        # * we have received almost complete separator (without last
440        #   byte). i.e buffer='some textSEPARATO'. In this case we
441        #   can safely consume len(separator) - 1 bytes.
442        # * last byte of buffer is first byte of separator, i.e.
443        #   buffer='abcdefghijklmnopqrS'. We may safely consume
444        #   everything except that last byte, but this require to
445        #   analyze bytes of buffer that match partial separator.
446        #   This is slow and/or require FSM. For this case our
447        #   implementation is not optimal, since require rescanning
448        #   of data that is known to not belong to separator. In
449        #   real world, separator will not be so long to notice
450        #   performance problems. Even when reading MIME-encoded
451        #   messages :)
452
453        # `offset` is the number of bytes from the beginning of the buffer
454        # where there is no occurrence of `separator`.
455        offset = 0
456
457        # Loop until we find `separator` in the buffer, exceed the buffer size,
458        # or an EOF has happened.
459        while True:
460            buflen = self._smart_buffer.size()
461
462            # Check if we now have enough data in the buffer for `separator` to
463            # fit.
464            if buflen - offset >= seplen:
465                isep = self._smart_buffer.find(separator, offset)
466
467                if isep != -1:
468                    # `separator` is in the buffer. `isep` will be used later
469                    # to retrieve the data.
470                    break
471
472                # see upper comment for explanation.
473                offset = buflen + 1 - seplen
474                if offset > self._limit:
475                    raise LimitOverrunError(
476                        'Separator is not found, and chunk exceed the limit',
477                        offset)
478
479            # Complete message (with full separator) may be present in buffer
480            # even when EOF flag is set. This may happen when the last chunk
481            # adds data which makes separator be found. That's why we check for
482            # EOF *ater* inspecting the buffer.
483            if self._eof:
484                chunk = self._smart_buffer.get_data(self._smart_buffer.size())
485                raise IncompleteReadError(chunk, None)
486
487            # _wait_for_data() will resume reading if stream was paused.
488            await self._wait_for_data('readuntil')
489
490        if isep > self._limit:
491            raise LimitOverrunError(
492                'Separator is found, but chunk is longer than limit', isep)
493
494        chunk = self._smart_buffer.get_data(isep + seplen)
495        self._maybe_resume_transport()
496        return bytes(chunk)
497
498    async def read(self, n=-1):
499        """Read up to `n` bytes from the stream.
500
501        If n is not provided, or set to -1, read until EOF and return all read
502        bytes. If the EOF was received and the internal buffer is empty, return
503        an empty bytes object.
504
505        If n is zero, return empty bytes object immediately.
506
507        If n is positive, this function try to read `n` bytes, and may return
508        less or equal bytes than requested, but at least one byte. If EOF was
509        received before any byte is read, this function returns empty byte
510        object.
511
512        Returned value is not limited with limit, configured at stream
513        creation.
514
515        If stream was paused, this function will automatically resume it if
516        needed.
517        """
518
519        if self._exception is not None:
520            raise self._exception
521
522        if n == 0:
523            return b''
524
525        if n < 0:
526            # This used to just loop creating a new waiter hoping to
527            # collect everything in self._buffer, but that would
528            # deadlock if the subprocess sends more than self.limit
529            # bytes.  So just call self.read(self._limit) until EOF.
530            blocks = []
531            while True:
532                block = await self.read_nearly(max(self._limit, self._smart_buffer.size()))
533                if not block:
534                    break
535                blocks.append(block)
536            return b''.join(blocks)
537
538        if not self._smart_buffer.size() and not self._eof:
539            await self._wait_for_data('read')
540
541        # This will work right even if buffer is less than n bytes
542        data = self._smart_buffer.get_data(min(n, self._smart_buffer.size()))
543
544        self._maybe_resume_transport()
545        return data
546
547    async def read_nearly(self, n=-1):
548        """Read up to `n` bytes from the stream.
549
550        If n is not provided, or set to -1, read until EOF and return all read
551        bytes. If the EOF was received and the internal buffer is empty, return
552        an empty bytes object.
553
554        If n is zero, return empty bytes object immediately.
555
556        If n is positive, this function try to read `n` bytes, and may return
557        less or equal bytes than requested, but at least one byte. If EOF was
558        received before any byte is read, this function returns empty byte
559        object.
560
561        Returned value is not limited with limit, configured at stream
562        creation.
563
564        If stream was paused, this function will automatically resume it if
565        needed.
566        """
567
568        if self._exception is not None:
569            raise self._exception
570
571        if n == 0:
572            return b''
573
574        if n < 0:
575            # This used to just loop creating a new waiter hoping to
576            # collect everything in self._buffer, but that would
577            # deadlock if the subprocess sends more than self.limit
578            # bytes.  So just call self.read(self._limit) until EOF.
579            blocks = []
580            while True:
581                block = await self.read_nearly(max(self._limit, self._smart_buffer.size()))
582                if not block:
583                    break
584                blocks.append(block)
585            return b''.join(blocks)
586
587        if not self._smart_buffer.size() and not self._eof:
588            await self._wait_for_data('read')
589
590        # This will work right even if buffer is less than n bytes
591        data = self._smart_buffer.get_data_nearly(n)
592
593        self._maybe_resume_transport()
594        return data
595
596    async def readexactly(self, n):
597        """Read exactly `n` bytes.
598
599        Raise an IncompleteReadError if EOF is reached before `n` bytes can be
600        read. The IncompleteReadError.partial attribute of the exception will
601        contain the partial read bytes.
602
603        if n is zero, return empty bytes object.
604
605        Returned value is not limited with limit, configured at stream
606        creation.
607
608        If stream was paused, this function will automatically resume it if
609        needed.
610        """
611        if n < 0:
612            raise ValueError('readexactly size can not be less than zero')
613
614        if self._exception is not None:
615            raise self._exception
616
617        if n == 0:
618            return b''
619
620        while self._smart_buffer.size() < n:
621            if self._eof:
622                incomplete = self._smart_buffer.get_data(self._smart_buffer.size())
623                raise IncompleteReadError(incomplete, n)
624
625            await self._wait_for_data('readexactly')
626
627        if self._smart_buffer.size() == n:
628            data = self._smart_buffer.get_data(self._smart_buffer.size())
629        else:
630            data = self._smart_buffer.get_data(n)
631
632        self._maybe_resume_transport()
633        return data
634    
635    async def readonly_exactly(self, n):
636        if n < 0:
637            raise ValueError('readexactly size can not be less than zero')
638
639        if self._exception is not None:
640            raise self._exception
641
642        if n == 0:
643            return b''
644
645        while self._smart_buffer.size() < n:
646            if self._eof:
647                incomplete = self._smart_buffer.read_data(self._smart_buffer.size())
648                raise IncompleteReadError(incomplete, n)
649
650            await self._wait_for_data('readexactly')
651
652        if self._smart_buffer.size() == n:
653            data = self._smart_buffer.read_data(self._smart_buffer.size())
654        else:
655            data = self._smart_buffer.read_data(n)
656
657        self._maybe_resume_transport()
658        return data
659    
660    async def read_message(self):
661        message_len_encoded = await self.readexactly(self._message_protocol_settings._message_size_len)
662        message_len = int.from_bytes(message_len_encoded, 'little')
663        return await self.readexactly(message_len)
664    
665    def message_awailable(self) -> bool:
666        message_size_len = self._message_protocol_settings._message_size_len
667        if self._smart_buffer.size() < message_size_len:
668            return False
669
670        message_len_encoded = self._smart_buffer.get_data(message_size_len)
671        message_len = int.from_bytes(message_len_encoded, 'little')
672        if self._smart_buffer.size() < (message_size_len + message_len):
673            return False
674        
675        return True
676    
677    def transport_pause_reading(self):
678        try:
679            self._transport.pause_reading()
680        except NotImplementedError:
681            # The transport can't be paused.
682            # We'll just have to buffer all data.
683            # Forget the transport so we don't keep trying.
684            pass
685        else:
686            self._paused = True
687    
688    def transport_resume_reading(self):
689        self._paused = False
690        self._transport.resume_reading()
691
692
693class UdpStreamReaderProtocol(OriginalStreamReaderProtocol, StreamReaderProtocolAbstract):
694    def __init__(self, manager: UdpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None:
695        self._stream_manager = manager
696        self._message_protocol_settings: MessageProtocolSettings = message_protocol_settings
697        super().__init__(*args, **kwargs)
698
699    def connection_made(self, transport):
700        if self._reject_connection:
701            context = {
702                'message': ('An open stream was garbage collected prior to '
703                            'establishing network connection; '
704                            'call "stream.close()" explicitly.')
705            }
706            if self._source_traceback:
707                context['source_traceback'] = self._source_traceback
708            self._loop.call_exception_handler(context)
709            transport.abort()
710            return
711        
712        self._transport = transport
713        reader = self._stream_reader
714        if reader is not None:
715            reader.set_transport(transport)
716
717        self._over_ssl = transport.get_extra_info('sslcontext') is not None
718        if self._client_connected_cb is not None:
719            self._stream_writer = UdpStreamWriter(transport, self,
720                                               reader,
721                                               self._loop)
722            res = self._client_connected_cb(reader,
723                                            self._stream_writer)
724            if coroutines.iscoroutine(res):
725                self._loop.create_task(res)
726            
727            self._strong_reader = None
728
729
730class UdpStreamWriter(OriginalStreamWriter, StreamWriterAbstract):
731    def __init__(self, *args, **kwargs) -> None:
732        super().__init__(*args, **kwargs)
733        self._stream_manager: UdpStreamManager = self._protocol._stream_manager
734        self._output_to_client: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.output_to_client_container_type(
735            external_data_length=self._stream_manager.io_memory_management.global_out__data_full_size)
736        self.socket_write_fixed_buffer_size: ValueExistence = self._stream_manager.io_memory_management.socket_write_fixed_buffer_size
737        self._autonomous_writer_future: Task = None
738        self._autonomous_writer_future_stop_requessted: bool = False
739        self._autonomous_writer_drain_enough_futures: List[Future] = list()
740    
741    def optimized_write(self, data):
742        self._output_to_client.add_piece_of_data(data)
743        # self.write(data)
744
745    def owrite(self, data):
746        return self.optimized_write(data)
747
748    async def partial_drain(self):
749        another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
750        while another_piece_of_data:
751            self.write(another_piece_of_data)
752            await self.drain()
753            another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
754
755    async def pdrain(self):
756        return await self.partial_drain()
757
758    async def full_drain(self):
759        await self.pdrain()
760        rest_of_the_data_size = self._output_to_client.size()
761        if rest_of_the_data_size:
762            another_piece_of_data = self._output_to_client.get_data(rest_of_the_data_size)
763            self.write(another_piece_of_data)
764            await self.drain()
765
766    async def fdrain(self):
767        return await self.full_drain()
768    
769    def _release_autonomous_writer_waiters(self):
770        current_size = self._output_to_client.size()
771        autonomous_writer_drain_enough_futures_buff = self._autonomous_writer_drain_enough_futures
772        self._autonomous_writer_drain_enough_futures = type(autonomous_writer_drain_enough_futures_buff)()
773        for item in autonomous_writer_drain_enough_futures_buff:
774            lower_water_size, future = item
775            if current_size < lower_water_size:
776                if (not future.done()) and (not future.cancelled()):
777                    future.set_result(None)
778
779            if (not future.done()) and (not future.cancelled()):
780                self._autonomous_writer_drain_enough_futures.append(item)
781    
782    async def _autonomous_writer_impl(self):
783        ty = TimedYield(0)
784        while not self._autonomous_writer_future_stop_requessted:
785            another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
786            self._release_autonomous_writer_waiters()
787            while another_piece_of_data:
788                self.write(another_piece_of_data)
789                await self.drain()
790                another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
791                self._release_autonomous_writer_waiters()
792            
793            await ty()
794    
795    def start_autonomous_writer(self):
796        self._autonomous_writer_future = create_task(self._autonomous_writer_impl)
797    
798    def start_aw(self):
799        return self.start_autonomous_writer()
800    
801    async def stop_autonomous_writer(self, timeout: Optional[Union[int, float]] = 0):
802        """_summary_
803
804        Args:
805            timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout
806
807        Returns:
808            _type_: _description_
809        """
810        result = None
811        if timeout is None:
812            timeout = self._stream_manager.autonomous_writer_stop_default_timeout
813        
814        if self._autonomous_writer_future and (not self._autonomous_writer_future_stop_requessted):
815            self._autonomous_writer_future_stop_requessted = True
816            if timeout:
817                result = await asyncio.wait_for(self._autonomous_writer_future, timeout)
818            else:
819                result = await self._autonomous_writer_future
820            
821            self._autonomous_writer_future = None
822            self._autonomous_writer_future_stop_requessted = False
823        
824        return result
825    
826    async def stop_aw(self, timeout: Optional[Union[int, float]] = 0):
827        """_summary_
828
829        Args:
830            timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout
831
832        Returns:
833            _type_: _description_
834        """
835        return await self.stop_autonomous_writer(timeout)
836    
837    async def autonomous_writer_drain_enough(self, lower_water_size: Optional[int] = None):
838        if lower_water_size is None:
839            lower_water_size = self.socket_write_fixed_buffer_size.value * 3
840            # print(f'lower_water_size: {lower_water_size}')
841            # lower_water_size = cpu_info().l3_cache_size
842        
843        if lower_water_size <= self._output_to_client.size():
844            future: Future = self._loop.create_future()
845            self._autonomous_writer_drain_enough_futures.append((lower_water_size, future))
846            await future
847    
848    async def aw_drain_enough(self):
849        await self.autonomous_writer_drain_enough()
850    
851    def optimized_write_message(self, data):
852        self.optimized_write(len(data).to_bytes(self._protocol._message_protocol_settings._message_size_len, 'little') + data)
853    
854    def owrite_message(self, data):
855        self.optimized_write_message(data)
856    
857    async def send_message(self, data):
858        self.optimized_write_message(data)
859        await self.fdrain()
class StreamType(enum.Enum):
49class StreamType(Enum):
50    general = 0
51    message_based_anonymous = 1
52    message_based_names = 2
53    gate = 3

An enumeration.

general = <StreamType.general: 0>
message_based_anonymous = <StreamType.message_based_anonymous: 1>
message_based_names = <StreamType.message_based_names: 2>
gate = <StreamType.gate: 3>
Inherited Members
enum.Enum
name
value
class GateSecurityPolicy(enum.Enum):
56class GateSecurityPolicy(Enum):
57    # gate will allow only or ban selected stream names to conect to this gate
58    allowed = 0
59    disabled = 1

An enumeration.

disabled = <GateSecurityPolicy.disabled: 1>
Inherited Members
enum.Enum
name
value
class StreamManagerIOCoreMemoryManagement(cengal.io.core.memory_management.versions.v_0.memory_management.IOCoreMemoryManagement):
62class StreamManagerIOCoreMemoryManagement(IOCoreMemoryManagement):
63    def __init__(self):
64        super(StreamManagerIOCoreMemoryManagement, self).__init__()
65
66        self.socket_write_fixed_buffer_size = ValueExistence(True,
67                                                             cpu_info().l2_cache_size_per_virtual_core)
68
69    def link_to(self, parent):
70        super(StreamManagerIOCoreMemoryManagement, self).link_to(parent)
71        try:
72            self.socket_write_fixed_buffer_size = parent.socket_write_fixed_buffer_size
73        except AttributeError:
74            pass
socket_write_fixed_buffer_size
Inherited Members
cengal.io.core.memory_management.versions.v_0.memory_management.IOCoreMemoryManagement
global__data_size_limit
global__data_full_size
global__deletable_data_full_size
global_other__data_size_limit
global_other__data_full_size
global_other__deletable_data_full_size
global_in__data_size_limit
global_in__data_full_size
global_in__deletable_data_full_size
global_out__data_size_limit
global_out__data_full_size
global_out__deletable_data_full_size
class UdpStreamManager(cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamManagerAbstract):
 77class UdpStreamManager(StreamManagerAbstract):
 78    def __init__(self) -> None:
 79        self.io_memory_management: StreamManagerIOCoreMemoryManagement = StreamManagerIOCoreMemoryManagement()
 80        self.autonomous_writer_stop_default_timeout: Optional[Union[int, float]] = 10.0
 81        self.output_to_client_container_type = DynamicListOfPiecesDequeWithLengthControl
 82        self.input_from_client_container_type = DynamicListOfPiecesDequeWithLengthControl
 83
 84    async def open_connection(self, host=None, port=None, *,
 85                            loop=None, limit=DEFAULT_LIMIT,
 86                            stream_type: StreamType = StreamType.general, stream_name: str = str(),
 87                            protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None,
 88                            max_message_size_len: Optional[int] = None,
 89                            **kwds):
 90        """A wrapper for create_connection() returning a (reader, writer) pair.
 91
 92        The reader returned is a UdpStreamReader instance; the writer is a
 93        UdpStreamWriter instance.
 94
 95        The arguments are all the usual arguments to create_connection()
 96        except protocol_factory; most common are positional host and port,
 97        with various optional keyword arguments following.
 98
 99        Additional optional keyword arguments are loop (to set the event loop
100        instance to use) and limit (to set the buffer limit passed to the
101        UdpStreamReader).
102
103        (If you want to customize the UdpStreamReader and/or
104        UdpStreamReaderProtocol classes, just copy the code -- there's
105        really nothing special here except some convenience.)
106        """
107        if StreamType.gate == stream_type:
108            raise ValueError(f'Wrong stream_type value: client can not be a "gate".')
109        
110        if loop is None:
111            loop = events.get_event_loop()
112        else:
113            warnings.warn("The loop argument is deprecated since Python 3.8, "
114                        "and scheduled for removal in Python 3.10.",
115                        DeprecationWarning, stacklevel=2)
116        
117        message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len)
118        reader = UdpStreamReader(self, message_protocol_settings, limit=limit, loop=loop)
119        protocol = UdpStreamReaderProtocol(self, message_protocol_settings, reader, loop=loop)
120        transport, _ = await loop.create_connection(
121            lambda: protocol, host, port, **kwds)
122        writer = UdpStreamWriter(transport, protocol, reader, loop)
123        return reader, writer
124
125    async def start_server(self, client_connected_cb, host=None, port=None, *,
126                        loop=None, limit=DEFAULT_LIMIT,
127                        stream_type: StreamType = StreamType.general, stream_name: str = str(),
128                        gate_security_policy: GateSecurityPolicy = GateSecurityPolicy.disabled, policy_managed_stream_names: Optional[Set[str]] = None,
129                        protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None,
130                        max_message_size_len: Optional[int] = None,
131                        **kwds):
132        """Start a socket server, call back for each client connected.
133
134        The first parameter, `client_connected_cb`, takes two parameters:
135        client_reader, client_writer.  client_reader is a UdpStreamReader
136        object, while client_writer is a UdpStreamWriter object.  This
137        parameter can either be a plain callback function or a coroutine;
138        if it is a coroutine, it will be automatically converted into a
139        Task.
140
141        The rest of the arguments are all the usual arguments to
142        loop.create_server() except protocol_factory; most common are
143        positional host and port, with various optional keyword arguments
144        following.  The return value is the same as loop.create_server().
145
146        Additional optional keyword arguments are loop (to set the event loop
147        instance to use) and limit (to set the buffer limit passed to the
148        UdpStreamReader).
149
150        The return value is the same as loop.create_server(), i.e. a
151        Server object which can be used to stop the service.
152        """
153        if loop is None:
154            loop = events.get_event_loop()
155        else:
156            warnings.warn("The loop argument is deprecated since Python 3.8, "
157                        "and scheduled for removal in Python 3.10.",
158                        DeprecationWarning, stacklevel=2)
159
160        def factory():
161            message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len)
162            reader =  UdpStreamReader(self, message_protocol_settings, limit=limit, loop=loop)
163            protocol = UdpStreamReaderProtocol(self, message_protocol_settings, reader, client_connected_cb,
164                                            loop=loop)
165            return protocol
166
167        return await loop.create_server(factory, host, port, **kwds)
168    
169    async def try_establish_message_protocol_server_side(self, reader: 'UdpStreamReader', writer: 'UdpStreamWriter') -> bool:
170        return True
171    
172    async def try_establish_message_protocol_client_side(self, reader: 'UdpStreamReader', writer: 'UdpStreamWriter') -> bool:
173        return True
io_memory_management: StreamManagerIOCoreMemoryManagement
autonomous_writer_stop_default_timeout: Union[int, float, NoneType]
output_to_client_container_type
input_from_client_container_type
async def open_connection( self, host=None, port=None, *, loop=None, limit=10485760, stream_type: StreamType = <StreamType.general: 0>, stream_name: str = '', protocol_greeting: Union[str, NoneType] = None, message_size_len: Union[int, NoneType] = None, max_message_size_len: Union[int, NoneType] = None, **kwds):
 84    async def open_connection(self, host=None, port=None, *,
 85                            loop=None, limit=DEFAULT_LIMIT,
 86                            stream_type: StreamType = StreamType.general, stream_name: str = str(),
 87                            protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None,
 88                            max_message_size_len: Optional[int] = None,
 89                            **kwds):
 90        """A wrapper for create_connection() returning a (reader, writer) pair.
 91
 92        The reader returned is a UdpStreamReader instance; the writer is a
 93        UdpStreamWriter instance.
 94
 95        The arguments are all the usual arguments to create_connection()
 96        except protocol_factory; most common are positional host and port,
 97        with various optional keyword arguments following.
 98
 99        Additional optional keyword arguments are loop (to set the event loop
100        instance to use) and limit (to set the buffer limit passed to the
101        UdpStreamReader).
102
103        (If you want to customize the UdpStreamReader and/or
104        UdpStreamReaderProtocol classes, just copy the code -- there's
105        really nothing special here except some convenience.)
106        """
107        if StreamType.gate == stream_type:
108            raise ValueError(f'Wrong stream_type value: client can not be a "gate".')
109        
110        if loop is None:
111            loop = events.get_event_loop()
112        else:
113            warnings.warn("The loop argument is deprecated since Python 3.8, "
114                        "and scheduled for removal in Python 3.10.",
115                        DeprecationWarning, stacklevel=2)
116        
117        message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len)
118        reader = UdpStreamReader(self, message_protocol_settings, limit=limit, loop=loop)
119        protocol = UdpStreamReaderProtocol(self, message_protocol_settings, reader, loop=loop)
120        transport, _ = await loop.create_connection(
121            lambda: protocol, host, port, **kwds)
122        writer = UdpStreamWriter(transport, protocol, reader, loop)
123        return reader, writer

A wrapper for create_connection() returning a (reader, writer) pair.

The reader returned is a UdpStreamReader instance; the writer is a UdpStreamWriter instance.

The arguments are all the usual arguments to create_connection() except protocol_factory; most common are positional host and port, with various optional keyword arguments following.

Additional optional keyword arguments are loop (to set the event loop instance to use) and limit (to set the buffer limit passed to the UdpStreamReader).

(If you want to customize the UdpStreamReader and/or UdpStreamReaderProtocol classes, just copy the code -- there's really nothing special here except some convenience.)

async def start_server( self, client_connected_cb, host=None, port=None, *, loop=None, limit=10485760, stream_type: StreamType = <StreamType.general: 0>, stream_name: str = '', gate_security_policy: GateSecurityPolicy = <GateSecurityPolicy.disabled: 1>, policy_managed_stream_names: Union[Set[str], NoneType] = None, protocol_greeting: Union[str, NoneType] = None, message_size_len: Union[int, NoneType] = None, max_message_size_len: Union[int, NoneType] = None, **kwds):
125    async def start_server(self, client_connected_cb, host=None, port=None, *,
126                        loop=None, limit=DEFAULT_LIMIT,
127                        stream_type: StreamType = StreamType.general, stream_name: str = str(),
128                        gate_security_policy: GateSecurityPolicy = GateSecurityPolicy.disabled, policy_managed_stream_names: Optional[Set[str]] = None,
129                        protocol_greeting: Optional[str] = None, message_size_len: Optional[int] = None,
130                        max_message_size_len: Optional[int] = None,
131                        **kwds):
132        """Start a socket server, call back for each client connected.
133
134        The first parameter, `client_connected_cb`, takes two parameters:
135        client_reader, client_writer.  client_reader is a UdpStreamReader
136        object, while client_writer is a UdpStreamWriter object.  This
137        parameter can either be a plain callback function or a coroutine;
138        if it is a coroutine, it will be automatically converted into a
139        Task.
140
141        The rest of the arguments are all the usual arguments to
142        loop.create_server() except protocol_factory; most common are
143        positional host and port, with various optional keyword arguments
144        following.  The return value is the same as loop.create_server().
145
146        Additional optional keyword arguments are loop (to set the event loop
147        instance to use) and limit (to set the buffer limit passed to the
148        UdpStreamReader).
149
150        The return value is the same as loop.create_server(), i.e. a
151        Server object which can be used to stop the service.
152        """
153        if loop is None:
154            loop = events.get_event_loop()
155        else:
156            warnings.warn("The loop argument is deprecated since Python 3.8, "
157                        "and scheduled for removal in Python 3.10.",
158                        DeprecationWarning, stacklevel=2)
159
160        def factory():
161            message_protocol_settings: MessageProtocolSettings = MessageProtocolSettings(protocol_greeting, message_size_len, max_message_size_len)
162            reader =  UdpStreamReader(self, message_protocol_settings, limit=limit, loop=loop)
163            protocol = UdpStreamReaderProtocol(self, message_protocol_settings, reader, client_connected_cb,
164                                            loop=loop)
165            return protocol
166
167        return await loop.create_server(factory, host, port, **kwds)

Start a socket server, call back for each client connected.

The first parameter, client_connected_cb, takes two parameters: client_reader, client_writer. client_reader is a UdpStreamReader object, while client_writer is a UdpStreamWriter object. This parameter can either be a plain callback function or a coroutine; if it is a coroutine, it will be automatically converted into a Task.

The rest of the arguments are all the usual arguments to loop.create_server() except protocol_factory; most common are positional host and port, with various optional keyword arguments following. The return value is the same as loop.create_server().

Additional optional keyword arguments are loop (to set the event loop instance to use) and limit (to set the buffer limit passed to the UdpStreamReader).

The return value is the same as loop.create_server(), i.e. a Server object which can be used to stop the service.

async def try_establish_message_protocol_server_side( self, reader: UdpStreamReader, writer: UdpStreamWriter) -> bool:
169    async def try_establish_message_protocol_server_side(self, reader: 'UdpStreamReader', writer: 'UdpStreamWriter') -> bool:
170        return True
async def try_establish_message_protocol_client_side( self, reader: UdpStreamReader, writer: UdpStreamWriter) -> bool:
172    async def try_establish_message_protocol_client_side(self, reader: 'UdpStreamReader', writer: 'UdpStreamWriter') -> bool:
173        return True
class UdpStreamReader(asyncio.streams.StreamReader, cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamReaderAbstract):
260class UdpStreamReader(OriginalStreamReader, StreamReaderAbstract):
261    def __init__(self, manager: UdpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None:
262        self._stream_manager = manager
263        self._message_protocol_settings: MessageProtocolSettings = message_protocol_settings
264        super().__init__(*args, **kwargs)
265        self._smart_buffer: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.input_from_client_container_type(
266            external_data_length=self._stream_manager.io_memory_management.global_in__data_full_size)
267        self.recv_buff_size_computer = RecvBuffSizeComputer()
268        cpu_info_inst = cpu_info()
269        # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size
270        # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l2_cache_size_per_virtual_core
271        # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size_per_virtual_core
272        # self.recv_buff_size_computer.max_recv_buff_size = 3145728
273        self.recv_buff_size_computer.max_recv_buff_size = 10 * 1024**2
274        # self.recv_buff_size_computer.max_recv_buff_size = 1024
275        # print(f'max_recv_buff_size: {self.recv_buff_size_computer.max_recv_buff_size}')
276        self.limit_by_limit: bool = True
277        self.limit_by_global_in__data_size_limit: bool = True
278    
279    async def read_max(self):
280        return await self.read(self._limit)
281    
282    async def read_nearly_max(self):
283        return await self.read_nearly(self._limit)
284    
285    async def read_with_counter(self):
286        if self._exception is not None:
287            raise self._exception
288
289        # This used to just loop creating a new waiter hoping to
290        # collect everything in self._buffer, but that would
291        # deadlock if the subprocess sends more than self.limit
292        # bytes.  So just call self.read(self._limit) until EOF.
293        blocks = []
294        counter = 0
295        while True:
296            block = await self.read_max()
297            counter += 1
298            if not block:
299                break
300            blocks.append(block)
301        return b''.join(blocks), counter
302
303    def __repr__(self):
304        info = ['UdpStreamReader']
305        if self._smart_buffer.size():
306            info.append(f'{self._smart_buffer.size()} bytes')
307        if self._eof:
308            info.append('eof')
309        if self._limit != DEFAULT_LIMIT:
310            info.append(f'limit={self._limit}')
311        if self._waiter:
312            info.append(f'waiter={self._waiter!r}')
313        if self._exception:
314            info.append(f'exception={self._exception!r}')
315        if self._transport:
316            info.append(f'transport={self._transport!r}')
317        if self._paused:
318            info.append('paused')
319        return '<{}>'.format(' '.join(info))
320
321    def _maybe_resume_transport(self):
322        if isinstance(self._transport, (
323            proactor_events._ProactorDatagramTransport,
324            selector_events._SelectorTransport,
325            unix_events._UnixReadPipeTransport
326            )):
327            # if hasattr(self._transport, 'max_size'):
328            try:
329                self._transport.max_size = self.recv_buff_size_computer.recv_buff_size
330                # print(f'max_size: {self._transport.max_size}')
331            except AttributeError:
332                pass
333        else:
334            print(f'Unsupported transport: {type(self._transport)}')
335        
336        if self._paused \
337            and (
338                ((not self.limit_by_limit) and (not self.limit_by_global_in__data_size_limit)) \
339                or (self.limit_by_limit and (not self._limit)) \
340                or (self.limit_by_limit and (self._smart_buffer.size() <= self._limit)) \
341                or (self.limit_by_global_in__data_size_limit and (not self._stream_manager.io_memory_management.global_in__data_size_limit)) \
342                or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value <= self._stream_manager.io_memory_management.global_in__data_size_limit.value))
343            ):
344            self._paused = False
345            self._transport.resume_reading()
346
347    def at_eof(self):
348        """Return True if the buffer is empty and 'feed_eof' was called."""
349        return self._eof and not self._smart_buffer.size()
350
351    def feed_data(self, data):
352        assert not self._eof, 'feed_data after feed_eof'
353
354        if not data:
355            return
356
357        data_len = len(data)
358        self.recv_buff_size_computer.calc_new_recv_buff_size(data_len)
359        self._smart_buffer.add_piece_of_data(data)
360        self._wakeup_waiter()
361
362        if (self._transport is not None and
363                not self._paused 
364                and (
365                    (self.limit_by_limit and (self._smart_buffer.size() > 2 * self._limit)
366                    or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value > self._stream_manager.io_memory_management.global_in__data_size_limit.value)))
367                )):
368            try:
369                self._transport.pause_reading()
370            except NotImplementedError:
371                # The transport can't be paused.
372                # We'll just have to buffer all data.
373                # Forget the transport so we don't keep trying.
374                self._transport = None
375            else:
376                self._paused = True
377
378    async def readline(self):
379        """Read chunk of data from the stream until newline (b'\n') is found.
380
381        On success, return chunk that ends with newline. If only partial
382        line can be read due to EOF, return incomplete line without
383        terminating newline. When EOF was reached while no bytes read, empty
384        bytes object is returned.
385
386        If limit is reached, ValueError will be raised. In that case, if
387        newline was found, complete line including newline will be removed
388        from internal buffer. Else, internal buffer will be cleared. Limit is
389        compared against part of the line without newline.
390
391        If stream was paused, this function will automatically resume it if
392        needed.
393        """
394        sep = b'\n'
395        seplen = len(sep)
396        try:
397            line = await self.readuntil(sep)
398        except IncompleteReadError as e:
399            return e.partial
400        except LimitOverrunError as e:
401            if self._smart_buffer.startswith(sep, e.consumed):
402                self._smart_buffer.get_data(e.consumed + seplen)
403            else:
404                self._smart_buffer.clear()
405            
406            self._maybe_resume_transport()
407            raise ValueError(e.args[0])
408        return line
409
410    async def readuntil(self, separator=b'\n'):
411        """Read data from the stream until ``separator`` is found.
412
413        On success, the data and separator will be removed from the
414        internal buffer (consumed). Returned data will include the
415        separator at the end.
416
417        Configured stream limit is used to check result. Limit sets the
418        maximal length of data that can be returned, not counting the
419        separator.
420
421        If an EOF occurs and the complete separator is still not found,
422        an IncompleteReadError exception will be raised, and the internal
423        buffer will be reset.  The IncompleteReadError.partial attribute
424        may contain the separator partially.
425
426        If the data cannot be read because of over limit, a
427        LimitOverrunError exception  will be raised, and the data
428        will be left in the internal buffer, so it can be read again.
429        """
430        seplen = len(separator)
431        if seplen == 0:
432            raise ValueError('Separator should be at least one-byte string')
433
434        if self._exception is not None:
435            raise self._exception
436
437        # Consume whole buffer except last bytes, which length is
438        # one less than seplen. Let's check corner cases with
439        # separator='SEPARATOR':
440        # * we have received almost complete separator (without last
441        #   byte). i.e buffer='some textSEPARATO'. In this case we
442        #   can safely consume len(separator) - 1 bytes.
443        # * last byte of buffer is first byte of separator, i.e.
444        #   buffer='abcdefghijklmnopqrS'. We may safely consume
445        #   everything except that last byte, but this require to
446        #   analyze bytes of buffer that match partial separator.
447        #   This is slow and/or require FSM. For this case our
448        #   implementation is not optimal, since require rescanning
449        #   of data that is known to not belong to separator. In
450        #   real world, separator will not be so long to notice
451        #   performance problems. Even when reading MIME-encoded
452        #   messages :)
453
454        # `offset` is the number of bytes from the beginning of the buffer
455        # where there is no occurrence of `separator`.
456        offset = 0
457
458        # Loop until we find `separator` in the buffer, exceed the buffer size,
459        # or an EOF has happened.
460        while True:
461            buflen = self._smart_buffer.size()
462
463            # Check if we now have enough data in the buffer for `separator` to
464            # fit.
465            if buflen - offset >= seplen:
466                isep = self._smart_buffer.find(separator, offset)
467
468                if isep != -1:
469                    # `separator` is in the buffer. `isep` will be used later
470                    # to retrieve the data.
471                    break
472
473                # see upper comment for explanation.
474                offset = buflen + 1 - seplen
475                if offset > self._limit:
476                    raise LimitOverrunError(
477                        'Separator is not found, and chunk exceed the limit',
478                        offset)
479
480            # Complete message (with full separator) may be present in buffer
481            # even when EOF flag is set. This may happen when the last chunk
482            # adds data which makes separator be found. That's why we check for
483            # EOF *ater* inspecting the buffer.
484            if self._eof:
485                chunk = self._smart_buffer.get_data(self._smart_buffer.size())
486                raise IncompleteReadError(chunk, None)
487
488            # _wait_for_data() will resume reading if stream was paused.
489            await self._wait_for_data('readuntil')
490
491        if isep > self._limit:
492            raise LimitOverrunError(
493                'Separator is found, but chunk is longer than limit', isep)
494
495        chunk = self._smart_buffer.get_data(isep + seplen)
496        self._maybe_resume_transport()
497        return bytes(chunk)
498
499    async def read(self, n=-1):
500        """Read up to `n` bytes from the stream.
501
502        If n is not provided, or set to -1, read until EOF and return all read
503        bytes. If the EOF was received and the internal buffer is empty, return
504        an empty bytes object.
505
506        If n is zero, return empty bytes object immediately.
507
508        If n is positive, this function try to read `n` bytes, and may return
509        less or equal bytes than requested, but at least one byte. If EOF was
510        received before any byte is read, this function returns empty byte
511        object.
512
513        Returned value is not limited with limit, configured at stream
514        creation.
515
516        If stream was paused, this function will automatically resume it if
517        needed.
518        """
519
520        if self._exception is not None:
521            raise self._exception
522
523        if n == 0:
524            return b''
525
526        if n < 0:
527            # This used to just loop creating a new waiter hoping to
528            # collect everything in self._buffer, but that would
529            # deadlock if the subprocess sends more than self.limit
530            # bytes.  So just call self.read(self._limit) until EOF.
531            blocks = []
532            while True:
533                block = await self.read_nearly(max(self._limit, self._smart_buffer.size()))
534                if not block:
535                    break
536                blocks.append(block)
537            return b''.join(blocks)
538
539        if not self._smart_buffer.size() and not self._eof:
540            await self._wait_for_data('read')
541
542        # This will work right even if buffer is less than n bytes
543        data = self._smart_buffer.get_data(min(n, self._smart_buffer.size()))
544
545        self._maybe_resume_transport()
546        return data
547
548    async def read_nearly(self, n=-1):
549        """Read up to `n` bytes from the stream.
550
551        If n is not provided, or set to -1, read until EOF and return all read
552        bytes. If the EOF was received and the internal buffer is empty, return
553        an empty bytes object.
554
555        If n is zero, return empty bytes object immediately.
556
557        If n is positive, this function try to read `n` bytes, and may return
558        less or equal bytes than requested, but at least one byte. If EOF was
559        received before any byte is read, this function returns empty byte
560        object.
561
562        Returned value is not limited with limit, configured at stream
563        creation.
564
565        If stream was paused, this function will automatically resume it if
566        needed.
567        """
568
569        if self._exception is not None:
570            raise self._exception
571
572        if n == 0:
573            return b''
574
575        if n < 0:
576            # This used to just loop creating a new waiter hoping to
577            # collect everything in self._buffer, but that would
578            # deadlock if the subprocess sends more than self.limit
579            # bytes.  So just call self.read(self._limit) until EOF.
580            blocks = []
581            while True:
582                block = await self.read_nearly(max(self._limit, self._smart_buffer.size()))
583                if not block:
584                    break
585                blocks.append(block)
586            return b''.join(blocks)
587
588        if not self._smart_buffer.size() and not self._eof:
589            await self._wait_for_data('read')
590
591        # This will work right even if buffer is less than n bytes
592        data = self._smart_buffer.get_data_nearly(n)
593
594        self._maybe_resume_transport()
595        return data
596
597    async def readexactly(self, n):
598        """Read exactly `n` bytes.
599
600        Raise an IncompleteReadError if EOF is reached before `n` bytes can be
601        read. The IncompleteReadError.partial attribute of the exception will
602        contain the partial read bytes.
603
604        if n is zero, return empty bytes object.
605
606        Returned value is not limited with limit, configured at stream
607        creation.
608
609        If stream was paused, this function will automatically resume it if
610        needed.
611        """
612        if n < 0:
613            raise ValueError('readexactly size can not be less than zero')
614
615        if self._exception is not None:
616            raise self._exception
617
618        if n == 0:
619            return b''
620
621        while self._smart_buffer.size() < n:
622            if self._eof:
623                incomplete = self._smart_buffer.get_data(self._smart_buffer.size())
624                raise IncompleteReadError(incomplete, n)
625
626            await self._wait_for_data('readexactly')
627
628        if self._smart_buffer.size() == n:
629            data = self._smart_buffer.get_data(self._smart_buffer.size())
630        else:
631            data = self._smart_buffer.get_data(n)
632
633        self._maybe_resume_transport()
634        return data
635    
636    async def readonly_exactly(self, n):
637        if n < 0:
638            raise ValueError('readexactly size can not be less than zero')
639
640        if self._exception is not None:
641            raise self._exception
642
643        if n == 0:
644            return b''
645
646        while self._smart_buffer.size() < n:
647            if self._eof:
648                incomplete = self._smart_buffer.read_data(self._smart_buffer.size())
649                raise IncompleteReadError(incomplete, n)
650
651            await self._wait_for_data('readexactly')
652
653        if self._smart_buffer.size() == n:
654            data = self._smart_buffer.read_data(self._smart_buffer.size())
655        else:
656            data = self._smart_buffer.read_data(n)
657
658        self._maybe_resume_transport()
659        return data
660    
661    async def read_message(self):
662        message_len_encoded = await self.readexactly(self._message_protocol_settings._message_size_len)
663        message_len = int.from_bytes(message_len_encoded, 'little')
664        return await self.readexactly(message_len)
665    
666    def message_awailable(self) -> bool:
667        message_size_len = self._message_protocol_settings._message_size_len
668        if self._smart_buffer.size() < message_size_len:
669            return False
670
671        message_len_encoded = self._smart_buffer.get_data(message_size_len)
672        message_len = int.from_bytes(message_len_encoded, 'little')
673        if self._smart_buffer.size() < (message_size_len + message_len):
674            return False
675        
676        return True
677    
678    def transport_pause_reading(self):
679        try:
680            self._transport.pause_reading()
681        except NotImplementedError:
682            # The transport can't be paused.
683            # We'll just have to buffer all data.
684            # Forget the transport so we don't keep trying.
685            pass
686        else:
687            self._paused = True
688    
689    def transport_resume_reading(self):
690        self._paused = False
691        self._transport.resume_reading()
UdpStreamReader( manager: UdpStreamManager, message_protocol_settings: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_base_internal.MessageProtocolSettings, *args, **kwargs)
261    def __init__(self, manager: UdpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None:
262        self._stream_manager = manager
263        self._message_protocol_settings: MessageProtocolSettings = message_protocol_settings
264        super().__init__(*args, **kwargs)
265        self._smart_buffer: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.input_from_client_container_type(
266            external_data_length=self._stream_manager.io_memory_management.global_in__data_full_size)
267        self.recv_buff_size_computer = RecvBuffSizeComputer()
268        cpu_info_inst = cpu_info()
269        # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size
270        # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l2_cache_size_per_virtual_core
271        # self.recv_buff_size_computer.max_recv_buff_size = cpu_info_inst.l3_cache_size_per_virtual_core
272        # self.recv_buff_size_computer.max_recv_buff_size = 3145728
273        self.recv_buff_size_computer.max_recv_buff_size = 10 * 1024**2
274        # self.recv_buff_size_computer.max_recv_buff_size = 1024
275        # print(f'max_recv_buff_size: {self.recv_buff_size_computer.max_recv_buff_size}')
276        self.limit_by_limit: bool = True
277        self.limit_by_global_in__data_size_limit: bool = True
recv_buff_size_computer
limit_by_limit: bool
limit_by_global_in__data_size_limit: bool
async def read_max(self):
279    async def read_max(self):
280        return await self.read(self._limit)
async def read_nearly_max(self):
282    async def read_nearly_max(self):
283        return await self.read_nearly(self._limit)
async def read_with_counter(self):
285    async def read_with_counter(self):
286        if self._exception is not None:
287            raise self._exception
288
289        # This used to just loop creating a new waiter hoping to
290        # collect everything in self._buffer, but that would
291        # deadlock if the subprocess sends more than self.limit
292        # bytes.  So just call self.read(self._limit) until EOF.
293        blocks = []
294        counter = 0
295        while True:
296            block = await self.read_max()
297            counter += 1
298            if not block:
299                break
300            blocks.append(block)
301        return b''.join(blocks), counter
def at_eof(self):
347    def at_eof(self):
348        """Return True if the buffer is empty and 'feed_eof' was called."""
349        return self._eof and not self._smart_buffer.size()

Return True if the buffer is empty and 'feed_eof' was called.

def feed_data(self, data):
351    def feed_data(self, data):
352        assert not self._eof, 'feed_data after feed_eof'
353
354        if not data:
355            return
356
357        data_len = len(data)
358        self.recv_buff_size_computer.calc_new_recv_buff_size(data_len)
359        self._smart_buffer.add_piece_of_data(data)
360        self._wakeup_waiter()
361
362        if (self._transport is not None and
363                not self._paused 
364                and (
365                    (self.limit_by_limit and (self._smart_buffer.size() > 2 * self._limit)
366                    or (self.limit_by_global_in__data_size_limit and (self._stream_manager.io_memory_management.global_in__data_full_size.value > self._stream_manager.io_memory_management.global_in__data_size_limit.value)))
367                )):
368            try:
369                self._transport.pause_reading()
370            except NotImplementedError:
371                # The transport can't be paused.
372                # We'll just have to buffer all data.
373                # Forget the transport so we don't keep trying.
374                self._transport = None
375            else:
376                self._paused = True
async def readline(self):
378    async def readline(self):
379        """Read chunk of data from the stream until newline (b'\n') is found.
380
381        On success, return chunk that ends with newline. If only partial
382        line can be read due to EOF, return incomplete line without
383        terminating newline. When EOF was reached while no bytes read, empty
384        bytes object is returned.
385
386        If limit is reached, ValueError will be raised. In that case, if
387        newline was found, complete line including newline will be removed
388        from internal buffer. Else, internal buffer will be cleared. Limit is
389        compared against part of the line without newline.
390
391        If stream was paused, this function will automatically resume it if
392        needed.
393        """
394        sep = b'\n'
395        seplen = len(sep)
396        try:
397            line = await self.readuntil(sep)
398        except IncompleteReadError as e:
399            return e.partial
400        except LimitOverrunError as e:
401            if self._smart_buffer.startswith(sep, e.consumed):
402                self._smart_buffer.get_data(e.consumed + seplen)
403            else:
404                self._smart_buffer.clear()
405            
406            self._maybe_resume_transport()
407            raise ValueError(e.args[0])
408        return line

Read chunk of data from the stream until newline (b' ') is found.

    On success, return chunk that ends with newline. If only partial
    line can be read due to EOF, return incomplete line without
    terminating newline. When EOF was reached while no bytes read, empty
    bytes object is returned.

    If limit is reached, ValueError will be raised. In that case, if
    newline was found, complete line including newline will be removed
    from internal buffer. Else, internal buffer will be cleared. Limit is
    compared against part of the line without newline.

    If stream was paused, this function will automatically resume it if
    needed.
async def readuntil(self, separator=b'\n'):
410    async def readuntil(self, separator=b'\n'):
411        """Read data from the stream until ``separator`` is found.
412
413        On success, the data and separator will be removed from the
414        internal buffer (consumed). Returned data will include the
415        separator at the end.
416
417        Configured stream limit is used to check result. Limit sets the
418        maximal length of data that can be returned, not counting the
419        separator.
420
421        If an EOF occurs and the complete separator is still not found,
422        an IncompleteReadError exception will be raised, and the internal
423        buffer will be reset.  The IncompleteReadError.partial attribute
424        may contain the separator partially.
425
426        If the data cannot be read because of over limit, a
427        LimitOverrunError exception  will be raised, and the data
428        will be left in the internal buffer, so it can be read again.
429        """
430        seplen = len(separator)
431        if seplen == 0:
432            raise ValueError('Separator should be at least one-byte string')
433
434        if self._exception is not None:
435            raise self._exception
436
437        # Consume whole buffer except last bytes, which length is
438        # one less than seplen. Let's check corner cases with
439        # separator='SEPARATOR':
440        # * we have received almost complete separator (without last
441        #   byte). i.e buffer='some textSEPARATO'. In this case we
442        #   can safely consume len(separator) - 1 bytes.
443        # * last byte of buffer is first byte of separator, i.e.
444        #   buffer='abcdefghijklmnopqrS'. We may safely consume
445        #   everything except that last byte, but this require to
446        #   analyze bytes of buffer that match partial separator.
447        #   This is slow and/or require FSM. For this case our
448        #   implementation is not optimal, since require rescanning
449        #   of data that is known to not belong to separator. In
450        #   real world, separator will not be so long to notice
451        #   performance problems. Even when reading MIME-encoded
452        #   messages :)
453
454        # `offset` is the number of bytes from the beginning of the buffer
455        # where there is no occurrence of `separator`.
456        offset = 0
457
458        # Loop until we find `separator` in the buffer, exceed the buffer size,
459        # or an EOF has happened.
460        while True:
461            buflen = self._smart_buffer.size()
462
463            # Check if we now have enough data in the buffer for `separator` to
464            # fit.
465            if buflen - offset >= seplen:
466                isep = self._smart_buffer.find(separator, offset)
467
468                if isep != -1:
469                    # `separator` is in the buffer. `isep` will be used later
470                    # to retrieve the data.
471                    break
472
473                # see upper comment for explanation.
474                offset = buflen + 1 - seplen
475                if offset > self._limit:
476                    raise LimitOverrunError(
477                        'Separator is not found, and chunk exceed the limit',
478                        offset)
479
480            # Complete message (with full separator) may be present in buffer
481            # even when EOF flag is set. This may happen when the last chunk
482            # adds data which makes separator be found. That's why we check for
483            # EOF *ater* inspecting the buffer.
484            if self._eof:
485                chunk = self._smart_buffer.get_data(self._smart_buffer.size())
486                raise IncompleteReadError(chunk, None)
487
488            # _wait_for_data() will resume reading if stream was paused.
489            await self._wait_for_data('readuntil')
490
491        if isep > self._limit:
492            raise LimitOverrunError(
493                'Separator is found, but chunk is longer than limit', isep)
494
495        chunk = self._smart_buffer.get_data(isep + seplen)
496        self._maybe_resume_transport()
497        return bytes(chunk)

Read data from the stream until separator is found.

On success, the data and separator will be removed from the internal buffer (consumed). Returned data will include the separator at the end.

Configured stream limit is used to check result. Limit sets the maximal length of data that can be returned, not counting the separator.

If an EOF occurs and the complete separator is still not found, an IncompleteReadError exception will be raised, and the internal buffer will be reset. The IncompleteReadError.partial attribute may contain the separator partially.

If the data cannot be read because of over limit, a LimitOverrunError exception will be raised, and the data will be left in the internal buffer, so it can be read again.

async def read(self, n=-1):
499    async def read(self, n=-1):
500        """Read up to `n` bytes from the stream.
501
502        If n is not provided, or set to -1, read until EOF and return all read
503        bytes. If the EOF was received and the internal buffer is empty, return
504        an empty bytes object.
505
506        If n is zero, return empty bytes object immediately.
507
508        If n is positive, this function try to read `n` bytes, and may return
509        less or equal bytes than requested, but at least one byte. If EOF was
510        received before any byte is read, this function returns empty byte
511        object.
512
513        Returned value is not limited with limit, configured at stream
514        creation.
515
516        If stream was paused, this function will automatically resume it if
517        needed.
518        """
519
520        if self._exception is not None:
521            raise self._exception
522
523        if n == 0:
524            return b''
525
526        if n < 0:
527            # This used to just loop creating a new waiter hoping to
528            # collect everything in self._buffer, but that would
529            # deadlock if the subprocess sends more than self.limit
530            # bytes.  So just call self.read(self._limit) until EOF.
531            blocks = []
532            while True:
533                block = await self.read_nearly(max(self._limit, self._smart_buffer.size()))
534                if not block:
535                    break
536                blocks.append(block)
537            return b''.join(blocks)
538
539        if not self._smart_buffer.size() and not self._eof:
540            await self._wait_for_data('read')
541
542        # This will work right even if buffer is less than n bytes
543        data = self._smart_buffer.get_data(min(n, self._smart_buffer.size()))
544
545        self._maybe_resume_transport()
546        return data

Read up to n bytes from the stream.

If n is not provided, or set to -1, read until EOF and return all read bytes. If the EOF was received and the internal buffer is empty, return an empty bytes object.

If n is zero, return empty bytes object immediately.

If n is positive, this function try to read n bytes, and may return less or equal bytes than requested, but at least one byte. If EOF was received before any byte is read, this function returns empty byte object.

Returned value is not limited with limit, configured at stream creation.

If stream was paused, this function will automatically resume it if needed.

async def read_nearly(self, n=-1):
548    async def read_nearly(self, n=-1):
549        """Read up to `n` bytes from the stream.
550
551        If n is not provided, or set to -1, read until EOF and return all read
552        bytes. If the EOF was received and the internal buffer is empty, return
553        an empty bytes object.
554
555        If n is zero, return empty bytes object immediately.
556
557        If n is positive, this function try to read `n` bytes, and may return
558        less or equal bytes than requested, but at least one byte. If EOF was
559        received before any byte is read, this function returns empty byte
560        object.
561
562        Returned value is not limited with limit, configured at stream
563        creation.
564
565        If stream was paused, this function will automatically resume it if
566        needed.
567        """
568
569        if self._exception is not None:
570            raise self._exception
571
572        if n == 0:
573            return b''
574
575        if n < 0:
576            # This used to just loop creating a new waiter hoping to
577            # collect everything in self._buffer, but that would
578            # deadlock if the subprocess sends more than self.limit
579            # bytes.  So just call self.read(self._limit) until EOF.
580            blocks = []
581            while True:
582                block = await self.read_nearly(max(self._limit, self._smart_buffer.size()))
583                if not block:
584                    break
585                blocks.append(block)
586            return b''.join(blocks)
587
588        if not self._smart_buffer.size() and not self._eof:
589            await self._wait_for_data('read')
590
591        # This will work right even if buffer is less than n bytes
592        data = self._smart_buffer.get_data_nearly(n)
593
594        self._maybe_resume_transport()
595        return data

Read up to n bytes from the stream.

If n is not provided, or set to -1, read until EOF and return all read bytes. If the EOF was received and the internal buffer is empty, return an empty bytes object.

If n is zero, return empty bytes object immediately.

If n is positive, this function try to read n bytes, and may return less or equal bytes than requested, but at least one byte. If EOF was received before any byte is read, this function returns empty byte object.

Returned value is not limited with limit, configured at stream creation.

If stream was paused, this function will automatically resume it if needed.

async def readexactly(self, n):
597    async def readexactly(self, n):
598        """Read exactly `n` bytes.
599
600        Raise an IncompleteReadError if EOF is reached before `n` bytes can be
601        read. The IncompleteReadError.partial attribute of the exception will
602        contain the partial read bytes.
603
604        if n is zero, return empty bytes object.
605
606        Returned value is not limited with limit, configured at stream
607        creation.
608
609        If stream was paused, this function will automatically resume it if
610        needed.
611        """
612        if n < 0:
613            raise ValueError('readexactly size can not be less than zero')
614
615        if self._exception is not None:
616            raise self._exception
617
618        if n == 0:
619            return b''
620
621        while self._smart_buffer.size() < n:
622            if self._eof:
623                incomplete = self._smart_buffer.get_data(self._smart_buffer.size())
624                raise IncompleteReadError(incomplete, n)
625
626            await self._wait_for_data('readexactly')
627
628        if self._smart_buffer.size() == n:
629            data = self._smart_buffer.get_data(self._smart_buffer.size())
630        else:
631            data = self._smart_buffer.get_data(n)
632
633        self._maybe_resume_transport()
634        return data

Read exactly n bytes.

Raise an IncompleteReadError if EOF is reached before n bytes can be read. The IncompleteReadError.partial attribute of the exception will contain the partial read bytes.

if n is zero, return empty bytes object.

Returned value is not limited with limit, configured at stream creation.

If stream was paused, this function will automatically resume it if needed.

async def readonly_exactly(self, n):
636    async def readonly_exactly(self, n):
637        if n < 0:
638            raise ValueError('readexactly size can not be less than zero')
639
640        if self._exception is not None:
641            raise self._exception
642
643        if n == 0:
644            return b''
645
646        while self._smart_buffer.size() < n:
647            if self._eof:
648                incomplete = self._smart_buffer.read_data(self._smart_buffer.size())
649                raise IncompleteReadError(incomplete, n)
650
651            await self._wait_for_data('readexactly')
652
653        if self._smart_buffer.size() == n:
654            data = self._smart_buffer.read_data(self._smart_buffer.size())
655        else:
656            data = self._smart_buffer.read_data(n)
657
658        self._maybe_resume_transport()
659        return data
async def read_message(self):
661    async def read_message(self):
662        message_len_encoded = await self.readexactly(self._message_protocol_settings._message_size_len)
663        message_len = int.from_bytes(message_len_encoded, 'little')
664        return await self.readexactly(message_len)
def message_awailable(self) -> bool:
666    def message_awailable(self) -> bool:
667        message_size_len = self._message_protocol_settings._message_size_len
668        if self._smart_buffer.size() < message_size_len:
669            return False
670
671        message_len_encoded = self._smart_buffer.get_data(message_size_len)
672        message_len = int.from_bytes(message_len_encoded, 'little')
673        if self._smart_buffer.size() < (message_size_len + message_len):
674            return False
675        
676        return True
def transport_pause_reading(self):
678    def transport_pause_reading(self):
679        try:
680            self._transport.pause_reading()
681        except NotImplementedError:
682            # The transport can't be paused.
683            # We'll just have to buffer all data.
684            # Forget the transport so we don't keep trying.
685            pass
686        else:
687            self._paused = True
def transport_resume_reading(self):
689    def transport_resume_reading(self):
690        self._paused = False
691        self._transport.resume_reading()
Inherited Members
asyncio.streams.StreamReader
exception
set_exception
set_transport
feed_eof
class UdpStreamReaderProtocol(asyncio.streams.StreamReaderProtocol, cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamReaderProtocolAbstract):
694class UdpStreamReaderProtocol(OriginalStreamReaderProtocol, StreamReaderProtocolAbstract):
695    def __init__(self, manager: UdpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None:
696        self._stream_manager = manager
697        self._message_protocol_settings: MessageProtocolSettings = message_protocol_settings
698        super().__init__(*args, **kwargs)
699
700    def connection_made(self, transport):
701        if self._reject_connection:
702            context = {
703                'message': ('An open stream was garbage collected prior to '
704                            'establishing network connection; '
705                            'call "stream.close()" explicitly.')
706            }
707            if self._source_traceback:
708                context['source_traceback'] = self._source_traceback
709            self._loop.call_exception_handler(context)
710            transport.abort()
711            return
712        
713        self._transport = transport
714        reader = self._stream_reader
715        if reader is not None:
716            reader.set_transport(transport)
717
718        self._over_ssl = transport.get_extra_info('sslcontext') is not None
719        if self._client_connected_cb is not None:
720            self._stream_writer = UdpStreamWriter(transport, self,
721                                               reader,
722                                               self._loop)
723            res = self._client_connected_cb(reader,
724                                            self._stream_writer)
725            if coroutines.iscoroutine(res):
726                self._loop.create_task(res)
727            
728            self._strong_reader = None

Helper class to adapt between Protocol and StreamReader.

(This is a helper class instead of making StreamReader itself a Protocol subclass, because the StreamReader has other potential uses, and to prevent the user of the StreamReader to accidentally call inappropriate methods of the protocol.)

UdpStreamReaderProtocol( manager: UdpStreamManager, message_protocol_settings: cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_base_internal.MessageProtocolSettings, *args, **kwargs)
695    def __init__(self, manager: UdpStreamManager, message_protocol_settings: MessageProtocolSettings, *args, **kwargs) -> None:
696        self._stream_manager = manager
697        self._message_protocol_settings: MessageProtocolSettings = message_protocol_settings
698        super().__init__(*args, **kwargs)
def connection_made(self, transport):
700    def connection_made(self, transport):
701        if self._reject_connection:
702            context = {
703                'message': ('An open stream was garbage collected prior to '
704                            'establishing network connection; '
705                            'call "stream.close()" explicitly.')
706            }
707            if self._source_traceback:
708                context['source_traceback'] = self._source_traceback
709            self._loop.call_exception_handler(context)
710            transport.abort()
711            return
712        
713        self._transport = transport
714        reader = self._stream_reader
715        if reader is not None:
716            reader.set_transport(transport)
717
718        self._over_ssl = transport.get_extra_info('sslcontext') is not None
719        if self._client_connected_cb is not None:
720            self._stream_writer = UdpStreamWriter(transport, self,
721                                               reader,
722                                               self._loop)
723            res = self._client_connected_cb(reader,
724                                            self._stream_writer)
725            if coroutines.iscoroutine(res):
726                self._loop.create_task(res)
727            
728            self._strong_reader = None

Called when a connection is made.

The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.

Inherited Members
asyncio.streams.StreamReaderProtocol
connection_lost
data_received
eof_received
asyncio.streams.FlowControlMixin
pause_writing
resume_writing
class UdpStreamWriter(asyncio.streams.StreamWriter, cengal.parallel_execution.asyncio.efficient_streams.versions.v_0.efficient_streams_abstract.StreamWriterAbstract):
731class UdpStreamWriter(OriginalStreamWriter, StreamWriterAbstract):
732    def __init__(self, *args, **kwargs) -> None:
733        super().__init__(*args, **kwargs)
734        self._stream_manager: UdpStreamManager = self._protocol._stream_manager
735        self._output_to_client: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.output_to_client_container_type(
736            external_data_length=self._stream_manager.io_memory_management.global_out__data_full_size)
737        self.socket_write_fixed_buffer_size: ValueExistence = self._stream_manager.io_memory_management.socket_write_fixed_buffer_size
738        self._autonomous_writer_future: Task = None
739        self._autonomous_writer_future_stop_requessted: bool = False
740        self._autonomous_writer_drain_enough_futures: List[Future] = list()
741    
742    def optimized_write(self, data):
743        self._output_to_client.add_piece_of_data(data)
744        # self.write(data)
745
746    def owrite(self, data):
747        return self.optimized_write(data)
748
749    async def partial_drain(self):
750        another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
751        while another_piece_of_data:
752            self.write(another_piece_of_data)
753            await self.drain()
754            another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
755
756    async def pdrain(self):
757        return await self.partial_drain()
758
759    async def full_drain(self):
760        await self.pdrain()
761        rest_of_the_data_size = self._output_to_client.size()
762        if rest_of_the_data_size:
763            another_piece_of_data = self._output_to_client.get_data(rest_of_the_data_size)
764            self.write(another_piece_of_data)
765            await self.drain()
766
767    async def fdrain(self):
768        return await self.full_drain()
769    
770    def _release_autonomous_writer_waiters(self):
771        current_size = self._output_to_client.size()
772        autonomous_writer_drain_enough_futures_buff = self._autonomous_writer_drain_enough_futures
773        self._autonomous_writer_drain_enough_futures = type(autonomous_writer_drain_enough_futures_buff)()
774        for item in autonomous_writer_drain_enough_futures_buff:
775            lower_water_size, future = item
776            if current_size < lower_water_size:
777                if (not future.done()) and (not future.cancelled()):
778                    future.set_result(None)
779
780            if (not future.done()) and (not future.cancelled()):
781                self._autonomous_writer_drain_enough_futures.append(item)
782    
783    async def _autonomous_writer_impl(self):
784        ty = TimedYield(0)
785        while not self._autonomous_writer_future_stop_requessted:
786            another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
787            self._release_autonomous_writer_waiters()
788            while another_piece_of_data:
789                self.write(another_piece_of_data)
790                await self.drain()
791                another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
792                self._release_autonomous_writer_waiters()
793            
794            await ty()
795    
796    def start_autonomous_writer(self):
797        self._autonomous_writer_future = create_task(self._autonomous_writer_impl)
798    
799    def start_aw(self):
800        return self.start_autonomous_writer()
801    
802    async def stop_autonomous_writer(self, timeout: Optional[Union[int, float]] = 0):
803        """_summary_
804
805        Args:
806            timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout
807
808        Returns:
809            _type_: _description_
810        """
811        result = None
812        if timeout is None:
813            timeout = self._stream_manager.autonomous_writer_stop_default_timeout
814        
815        if self._autonomous_writer_future and (not self._autonomous_writer_future_stop_requessted):
816            self._autonomous_writer_future_stop_requessted = True
817            if timeout:
818                result = await asyncio.wait_for(self._autonomous_writer_future, timeout)
819            else:
820                result = await self._autonomous_writer_future
821            
822            self._autonomous_writer_future = None
823            self._autonomous_writer_future_stop_requessted = False
824        
825        return result
826    
827    async def stop_aw(self, timeout: Optional[Union[int, float]] = 0):
828        """_summary_
829
830        Args:
831            timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout
832
833        Returns:
834            _type_: _description_
835        """
836        return await self.stop_autonomous_writer(timeout)
837    
838    async def autonomous_writer_drain_enough(self, lower_water_size: Optional[int] = None):
839        if lower_water_size is None:
840            lower_water_size = self.socket_write_fixed_buffer_size.value * 3
841            # print(f'lower_water_size: {lower_water_size}')
842            # lower_water_size = cpu_info().l3_cache_size
843        
844        if lower_water_size <= self._output_to_client.size():
845            future: Future = self._loop.create_future()
846            self._autonomous_writer_drain_enough_futures.append((lower_water_size, future))
847            await future
848    
849    async def aw_drain_enough(self):
850        await self.autonomous_writer_drain_enough()
851    
852    def optimized_write_message(self, data):
853        self.optimized_write(len(data).to_bytes(self._protocol._message_protocol_settings._message_size_len, 'little') + data)
854    
855    def owrite_message(self, data):
856        self.optimized_write_message(data)
857    
858    async def send_message(self, data):
859        self.optimized_write_message(data)
860        await self.fdrain()

Wraps a Transport.

This exposes write(), writelines(), [can_]write_eof(), get_extra_info() and close(). It adds drain() which returns an optional Future on which you can wait for flow control. It also adds a transport property which references the Transport directly.

UdpStreamWriter(*args, **kwargs)
732    def __init__(self, *args, **kwargs) -> None:
733        super().__init__(*args, **kwargs)
734        self._stream_manager: UdpStreamManager = self._protocol._stream_manager
735        self._output_to_client: DynamicListOfPiecesDequeWithLengthControl = self._stream_manager.output_to_client_container_type(
736            external_data_length=self._stream_manager.io_memory_management.global_out__data_full_size)
737        self.socket_write_fixed_buffer_size: ValueExistence = self._stream_manager.io_memory_management.socket_write_fixed_buffer_size
738        self._autonomous_writer_future: Task = None
739        self._autonomous_writer_future_stop_requessted: bool = False
740        self._autonomous_writer_drain_enough_futures: List[Future] = list()
socket_write_fixed_buffer_size: cengal.code_flow_control.smart_values.versions.v_2.smart_values.ValueExistence
def optimized_write(self, data):
742    def optimized_write(self, data):
743        self._output_to_client.add_piece_of_data(data)
744        # self.write(data)
def owrite(self, data):
746    def owrite(self, data):
747        return self.optimized_write(data)
async def partial_drain(self):
749    async def partial_drain(self):
750        another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
751        while another_piece_of_data:
752            self.write(another_piece_of_data)
753            await self.drain()
754            another_piece_of_data = self._output_to_client.get_data(self.socket_write_fixed_buffer_size.value)
async def pdrain(self):
756    async def pdrain(self):
757        return await self.partial_drain()
async def full_drain(self):
759    async def full_drain(self):
760        await self.pdrain()
761        rest_of_the_data_size = self._output_to_client.size()
762        if rest_of_the_data_size:
763            another_piece_of_data = self._output_to_client.get_data(rest_of_the_data_size)
764            self.write(another_piece_of_data)
765            await self.drain()
async def fdrain(self):
767    async def fdrain(self):
768        return await self.full_drain()
def start_autonomous_writer(self):
796    def start_autonomous_writer(self):
797        self._autonomous_writer_future = create_task(self._autonomous_writer_impl)
def start_aw(self):
799    def start_aw(self):
800        return self.start_autonomous_writer()
async def stop_autonomous_writer(self, timeout: Union[int, float, NoneType] = 0):
802    async def stop_autonomous_writer(self, timeout: Optional[Union[int, float]] = 0):
803        """_summary_
804
805        Args:
806            timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout
807
808        Returns:
809            _type_: _description_
810        """
811        result = None
812        if timeout is None:
813            timeout = self._stream_manager.autonomous_writer_stop_default_timeout
814        
815        if self._autonomous_writer_future and (not self._autonomous_writer_future_stop_requessted):
816            self._autonomous_writer_future_stop_requessted = True
817            if timeout:
818                result = await asyncio.wait_for(self._autonomous_writer_future, timeout)
819            else:
820                result = await self._autonomous_writer_future
821            
822            self._autonomous_writer_future = None
823            self._autonomous_writer_future_stop_requessted = False
824        
825        return result

_summary_

Args: timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout

Returns: _type_: _description_

async def stop_aw(self, timeout: Union[int, float, NoneType] = 0):
827    async def stop_aw(self, timeout: Optional[Union[int, float]] = 0):
828        """_summary_
829
830        Args:
831            timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout
832
833        Returns:
834            _type_: _description_
835        """
836        return await self.stop_autonomous_writer(timeout)

_summary_

Args: timeout (Optional[Union[int, float]], optional): _description_. Defaults to 0. 0 - infinit timeout; None - default timeout

Returns: _type_: _description_

async def autonomous_writer_drain_enough(self, lower_water_size: Union[int, NoneType] = None):
838    async def autonomous_writer_drain_enough(self, lower_water_size: Optional[int] = None):
839        if lower_water_size is None:
840            lower_water_size = self.socket_write_fixed_buffer_size.value * 3
841            # print(f'lower_water_size: {lower_water_size}')
842            # lower_water_size = cpu_info().l3_cache_size
843        
844        if lower_water_size <= self._output_to_client.size():
845            future: Future = self._loop.create_future()
846            self._autonomous_writer_drain_enough_futures.append((lower_water_size, future))
847            await future
async def aw_drain_enough(self):
849    async def aw_drain_enough(self):
850        await self.autonomous_writer_drain_enough()
def optimized_write_message(self, data):
852    def optimized_write_message(self, data):
853        self.optimized_write(len(data).to_bytes(self._protocol._message_protocol_settings._message_size_len, 'little') + data)
def owrite_message(self, data):
855    def owrite_message(self, data):
856        self.optimized_write_message(data)
async def send_message(self, data):
858    async def send_message(self, data):
859        self.optimized_write_message(data)
860        await self.fdrain()
Inherited Members
asyncio.streams.StreamWriter
transport
write
writelines
write_eof
can_write_eof
close
is_closing
wait_closed
get_extra_info
drain