create_datagram_endpoint
async def create_datagram_endpoint() -> Tuple[Union[_SelectorDatagramTransport, _ProactorDatagramTransport], DatagramProtocol]
PARTS OF: DatagramProtocol
DatagramProtocol
class DatagramProtocol(BaseProtocol):
"""Interface for datagram protocol."""
__slots__ = ()
def datagram_received(self, data, addr):
"""Called when some datagram is received."""
def error_received(self, exc):
"""Called when a send or receive operation raises an OSError.
(Other than BlockingIOError or InterruptedError.)
"""
BaseProtocol
class BaseProtocol:
"""Common base class for protocol interfaces.
Usually user implements protocols that derived from BaseProtocol
like Protocol or ProcessProtocol.
The only case when BaseProtocol should be implemented directly is
write-only transport like write pipe
"""
__slots__ = ()
def connection_made(self, transport):
"""Called when a connection is made.
The argument is the transport representing the pipe connection.
To receive data, wait for data_received() calls.
When the connection is closed, connection_lost() is called.
"""
def connection_lost(self, exc):
"""Called when the connection is lost or closed.
The argument is an exception object or None (the latter
meaning a regular EOF is received or the connection was
aborted or closed).
"""
def pause_writing(self):
"""Called when the transport's buffer goes over the high-water mark.
Pause and resume calls are paired -- pause_writing() is called
once when the buffer goes strictly over the high-water mark
(even if subsequent writes increases the buffer size even
more), and eventually resume_writing() is called once when the
buffer size reaches the low-water mark.
Note that if the buffer size equals the high-water mark,
pause_writing() is not called -- it must go strictly over.
Conversely, resume_writing() is called when the buffer size is
equal or lower than the low-water mark. These end conditions
are important to ensure that things go as expected when either
mark is zero.
NOTE: This is the only Protocol callback that is not called
through EventLoop.call_soon() -- if it were, it would have no
effect when it's most needed (when the app keeps writing
without yielding until pause_writing() is called).
"""
def resume_writing(self):
"""Called when the transport's buffer drains below the low-water mark.
See pause_writing() for details.
"""
FULL: DatagramProtocol
class DatagramProtocol(BaseProtocol):
"""Interface for datagram protocol."""
"""Common base class for protocol interfaces.
Usually user implements protocols that derived from BaseProtocol
like Protocol or ProcessProtocol.
The only case when BaseProtocol should be implemented directly is
write-only transport like write pipe
"""
__slots__ = ()
def datagram_received(self, data, addr):
"""Called when some datagram is received."""
def error_received(self, exc):
"""Called when a send or receive operation raises an OSError.
(Other than BlockingIOError or InterruptedError.)
"""
def connection_made(self, transport):
"""Called when a connection is made.
The argument is the transport representing the pipe connection.
To receive data, wait for data_received() calls.
When the connection is closed, connection_lost() is called.
"""
def connection_lost(self, exc):
"""Called when the connection is lost or closed.
The argument is an exception object or None (the latter
meaning a regular EOF is received or the connection was
aborted or closed).
"""
def pause_writing(self):
"""Called when the transport's buffer goes over the high-water mark.
Pause and resume calls are paired -- pause_writing() is called
once when the buffer goes strictly over the high-water mark
(even if subsequent writes increases the buffer size even
more), and eventually resume_writing() is called once when the
buffer size reaches the low-water mark.
Note that if the buffer size equals the high-water mark,
pause_writing() is not called -- it must go strictly over.
Conversely, resume_writing() is called when the buffer size is
equal or lower than the low-water mark. These end conditions
are important to ensure that things go as expected when either
mark is zero.
NOTE: This is the only Protocol callback that is not called
through EventLoop.call_soon() -- if it were, it would have no
effect when it's most needed (when the app keeps writing
without yielding until pause_writing() is called).
"""
def resume_writing(self):
"""Called when the transport's buffer drains below the low-water mark.
See pause_writing() for details.
"""
PARTS OF: _SelectorDatagramTransport
_SelectorDatagramTransport
class _SelectorDatagramTransport(_SelectorTransport):
def __init__(self, loop, sock, protocol, address=None,
waiter=None, extra=None):
raise NotImplementedError
def get_write_buffer_size(self) -> int:
raise NotImplementedError
def _read_ready(self) -> None:
raise NotImplementedError
def sendto(self, data, addr=None) -> None:
raise NotImplementedError
def _sendto_ready(self) -> None:
raise NotImplementedError
_SelectorTransport
class _SelectorTransport(transports._FlowControlMixin,
transports.Transport):
max_size = 256 * 1024 # Buffer size passed to recv().
_buffer_factory = bytearray # Constructs initial value for self._buffer.
# Attribute used in the destructor: it must be set even if the constructor
# is not called (see _SelectorSslTransport which may start by raising an
# exception)
_sock = None
def __init__(self, loop, sock, protocol, extra=None, server=None):
raise NotImplementedError
def __repr__(self) -> str:
raise NotImplementedError
def abort(self) -> None:
raise NotImplementedError
def set_protocol(self, protocol: DatagramProtocol) -> None:
raise NotImplementedError
def is_closing(self) -> bool:
raise NotImplementedError
def __del__(self, _warn=warnings.warn) -> None:
raise NotImplementedError
def _fatal_error(self, exc, message='Fatal error on transport') -> None:
# Should be called from exception handler only.
raise NotImplementedError
def _force_close(self, exc) -> None:
raise NotImplementedError
def _call_connection_lost(self, exc) -> None:
raise NotImplementedError
def get_write_buffer_size(self) -> int:
raise NotImplementedError
def _add_reader(self, fd, callback, *args) -> None:
raise NotImplementedError
_FlowControlMixin
class _FlowControlMixin(Transport):
def __init__(self, extra=None, loop=None):
raise NotImplementedError
def _maybe_pause_protocol(self) -> None:
raise NotImplementedError
def _maybe_resume_protocol(self) -> None:
raise NotImplementedError
def get_write_buffer_limits(self) -> Tuple[int, int]:
raise NotImplementedError
def _set_write_buffer_limits(self, high=None, low=None) -> None:
raise NotImplementedError
def set_write_buffer_limits(self, high=None, low=None) -> None:
raise NotImplementedError
def get_write_buffer_size(self) -> int:
raise NotImplementedError
Transport
class Transport(ReadTransport, WriteTransport):
pass
ReadTransport
class ReadTransport(BaseTransport):
def is_reading(self) -> bool:
"""Return True if the transport is receiving."""
raise NotImplementedError
def pause_reading(self) -> None:
"""Pause the receiving end.
No data will be passed to the protocol's data_received()
method until resume_reading() is called.
"""
raise NotImplementedError
def resume_reading(self) -> None:
"""Resume the receiving end.
Data received will once again be passed to the protocol's
data_received() method.
"""
raise NotImplementedError
WriteTransport
class WriteTransport(BaseTransport):
def set_write_buffer_limits(self, high=None, low=None):
"""Set the high- and low-water limits for write flow control.
These two values control when to call the protocol's
pause_writing() and resume_writing() methods. If specified,
the low-water limit must be less than or equal to the
high-water limit. Neither value can be negative.
The defaults are implementation-specific. If only the
high-water limit is given, the low-water limit defaults to an
implementation-specific value less than or equal to the
high-water limit. Setting high to zero forces low to zero as
well, and causes pause_writing() to be called whenever the
buffer becomes non-empty. Setting low to zero causes
resume_writing() to be called only once the buffer is empty.
Use of zero for either limit is generally sub-optimal as it
reduces opportunities for doing I/O and computation
concurrently.
"""
raise NotImplementedError
def get_write_buffer_size(self):
"""Return the current size of the write buffer."""
raise NotImplementedError
def write(self, data):
"""Write some data bytes to the transport.
This does not block; it buffers the data and arranges for it
to be sent out asynchronously.
"""
raise NotImplementedError
def writelines(self, list_of_data):
"""Write a list (or any iterable) of data bytes to the transport.
The default implementation concatenates the arguments and
calls write() on the result.
"""
data = b''.join(list_of_data)
self.write(data)
def write_eof(self):
"""Close the write end after flushing buffered data.
(This is like typing ^D into a UNIX program reading from stdin.)
Data may still be received.
"""
raise NotImplementedError
def can_write_eof(self):
"""Return True if this transport supports write_eof(), False if not."""
raise NotImplementedError
def abort(self):
"""Close the transport immediately.
Buffered data will be lost. No more data will be received.
The protocol's connection_lost() method will (eventually) be
called with None as its argument.
"""
raise NotImplementedError
BaseTransport
class BaseTransport:
def __init__(self, extra=None):
raise NotImplementedError
def get_extra_info(self, name, default=None):
"""Get optional transport information."""
return self._extra.get(name, default)
def is_closing(self):
"""Return True if the transport is closing or closed."""
raise NotImplementedError
def close(self):
"""Close the transport.
Buffered data will be flushed asynchronously. No more data
will be received. After all buffered data is flushed, the
protocol's connection_lost() method will (eventually) be
called with None as its argument.
"""
raise NotImplementedError
def set_protocol(self, protocol: DatagramProtocol):
"""Set a new protocol."""
raise NotImplementedError
def get_protocol(self) -> DatagramProtocol:
"""Return the current protocol."""
raise NotImplementedError
FULL: _SelectorDatagramTransport
class _SelectorDatagramTransport(_SelectorTransport):
max_size = 256 * 1024 # Buffer size passed to recv().
_buffer_factory = bytearray # Constructs initial value for self._buffer.
# Attribute used in the destructor: it must be set even if the constructor
# is not called (see _SelectorSslTransport which may start by raising an
# exception)
_sock = None
def __init__(self, loop, sock, protocol, address=None,
waiter=None, extra=None):
raise NotImplementedError
def _read_ready(self) -> None:
raise NotImplementedError
def sendto(self, data, addr=None) -> None:
raise NotImplementedError
def _sendto_ready(self) -> None:
raise NotImplementedError
def __repr__(self) -> str:
raise NotImplementedError
def abort(self) -> None:
raise NotImplementedError
def __del__(self, _warn=warnings.warn) -> None:
raise NotImplementedError
def _fatal_error(self, exc, message='Fatal error on transport') -> None:
# Should be called from exception handler only.
raise NotImplementedError
def _force_close(self, exc) -> None:
raise NotImplementedError
def _call_connection_lost(self, exc) -> None:
raise NotImplementedError
def _add_reader(self, fd, callback, *args) -> None:
raise NotImplementedError
def _maybe_pause_protocol(self) -> None:
raise NotImplementedError
def _maybe_resume_protocol(self) -> None:
raise NotImplementedError
def get_write_buffer_limits(self) -> Tuple[int, int]:
raise NotImplementedError
def _set_write_buffer_limits(self, high=None, low=None) -> None:
raise NotImplementedError
def set_write_buffer_limits(self, high=None, low=None) -> None:
raise NotImplementedError
def get_write_buffer_size(self) -> int:
raise NotImplementedError
def is_reading(self) -> bool:
"""Return True if the transport is receiving."""
raise NotImplementedError
def pause_reading(self) -> None:
"""Pause the receiving end.
No data will be passed to the protocol's data_received()
method until resume_reading() is called.
"""
raise NotImplementedError
def resume_reading(self) -> None:
"""Resume the receiving end.
Data received will once again be passed to the protocol's
data_received() method.
"""
raise NotImplementedError
def get_extra_info(self, name, default=None):
"""Get optional transport information."""
return self._extra.get(name, default)
def is_closing(self):
"""Return True if the transport is closing or closed."""
raise NotImplementedError
def close(self):
"""Close the transport.
Buffered data will be flushed asynchronously. No more data
will be received. After all buffered data is flushed, the
protocol's connection_lost() method will (eventually) be
called with None as its argument.
"""
raise NotImplementedError
def set_protocol(self, protocol: DatagramProtocol):
"""Set a new protocol."""
raise NotImplementedError
def get_protocol(self) -> DatagramProtocol:
"""Return the current protocol."""
raise NotImplementedError
PARTS OF: _ProactorDatagramTransport
_ProactorDatagramTransport
class _ProactorDatagramTransport(_ProactorBasePipeTransport):
max_size = 256 * 1024
def __init__(self, loop, sock, protocol, address=None,
waiter=None, extra=None):
raise NotImplementedError
def _set_extra(self, sock) -> None:
raise NotImplementedError
def get_write_buffer_size(self) -> int:
raise NotImplementedError
def abort(self) -> None:
raise NotImplementedError
def sendto(self, data, addr=None) -> None:
raise NotImplementedError
def _loop_writing(self, fut=None) -> None:
raise NotImplementedError
def _loop_reading(self, fut=None) -> None:
raise NotImplementedError
def _sendto_ready(self) -> None:
raise NotImplementedError
def _sendto_ready(self) -> None:
raise NotImplementedError
_ProactorBasePipeTransport
class _ProactorBasePipeTransport(transports._FlowControlMixin,
transports.BaseTransport):
def __init__(self, loop, sock, protocol, waiter=None,
extra=None, server=None):
raise NotImplementedError
def __repr__(self) -> str:
raise NotImplementedError
def _set_extra(self, sock) -> None:
raise NotImplementedError
def set_protocol(self, protocol: DatagramProtocol) -> None:
raise NotImplementedError
def get_protocol(self) -> DatagramProtocol:
raise NotImplementedError
def is_closing(self) -> bool:
raise NotImplementedError
def close(self) -> None:
raise NotImplementedError
def __del__(self, _warn=warnings.warn) -> None:
raise NotImplementedError
def _fatal_error(self, exc, message='Fatal error on pipe transport') -> None:
raise NotImplementedError
def _force_close(self, exc) -> None:
raise NotImplementedError
def _call_connection_lost(self, exc) -> None:
raise NotImplementedError
def get_write_buffer_size(self) -> int:
raise NotImplementedError
FULL: _ProactorDatagramTransport
class _ProactorDatagramTransport(_ProactorBasePipeTransport):
max_size = 256 * 1024
def __init__(self, loop, sock, protocol, address=None,
waiter=None, extra=None):
raise NotImplementedError
def sendto(self, data, addr=None) -> None:
raise NotImplementedError
def _loop_writing(self, fut=None) -> None:
raise NotImplementedError
def _loop_reading(self, fut=None) -> None:
raise NotImplementedError
def _sendto_ready(self) -> None:
raise NotImplementedError
def __repr__(self) -> str:
raise NotImplementedError
def _set_extra(self, sock) -> None:
raise NotImplementedError
def __del__(self, _warn=warnings.warn) -> None:
raise NotImplementedError
def _fatal_error(self, exc, message='Fatal error on pipe transport') -> None:
raise NotImplementedError
def _force_close(self, exc) -> None:
raise NotImplementedError
def _call_connection_lost(self, exc) -> None:
raise NotImplementedError
def _maybe_pause_protocol(self) -> None:
raise NotImplementedError
def _maybe_resume_protocol(self) -> None:
raise NotImplementedError
def get_write_buffer_limits(self) -> Tuple[int, int]:
raise NotImplementedError
def _set_write_buffer_limits(self, high=None, low=None) -> None:
raise NotImplementedError
def is_reading(self) -> bool:
"""Return True if the transport is receiving."""
raise NotImplementedError
def pause_reading(self) -> None:
"""Pause the receiving end.
No data will be passed to the protocol's data_received()
method until resume_reading() is called.
"""
raise NotImplementedError
def resume_reading(self) -> None:
"""Resume the receiving end.
Data received will once again be passed to the protocol's
data_received() method.
"""
raise NotImplementedError
def set_write_buffer_limits(self, high=None, low=None):
"""Set the high- and low-water limits for write flow control.
These two values control when to call the protocol's
pause_writing() and resume_writing() methods. If specified,
the low-water limit must be less than or equal to the
high-water limit. Neither value can be negative.
The defaults are implementation-specific. If only the
high-water limit is given, the low-water limit defaults to an
implementation-specific value less than or equal to the
high-water limit. Setting high to zero forces low to zero as
well, and causes pause_writing() to be called whenever the
buffer becomes non-empty. Setting low to zero causes
resume_writing() to be called only once the buffer is empty.
Use of zero for either limit is generally sub-optimal as it
reduces opportunities for doing I/O and computation
concurrently.
"""
raise NotImplementedError
def get_write_buffer_size(self) -> int:
"""Return the current size of the write buffer."""
raise NotImplementedError
def write(self, data):
"""Write some data bytes to the transport.
This does not block; it buffers the data and arranges for it
to be sent out asynchronously.
"""
raise NotImplementedError
def writelines(self, list_of_data):
"""Write a list (or any iterable) of data bytes to the transport.
The default implementation concatenates the arguments and
calls write() on the result.
"""
data = b''.join(list_of_data)
self.write(data)
def write_eof(self):
"""Close the write end after flushing buffered data.
(This is like typing ^D into a UNIX program reading from stdin.)
Data may still be received.
"""
raise NotImplementedError
def can_write_eof(self):
"""Return True if this transport supports write_eof(), False if not."""
raise NotImplementedError
def abort(self):
"""Close the transport immediately.
Buffered data will be lost. No more data will be received.
The protocol's connection_lost() method will (eventually) be
called with None as its argument.
"""
raise NotImplementedError
def get_extra_info(self, name, default=None):
"""Get optional transport information."""
return self._extra.get(name, default)
def is_closing(self):
"""Return True if the transport is closing or closed."""
raise NotImplementedError
def close(self):
"""Close the transport.
Buffered data will be flushed asynchronously. No more data
will be received. After all buffered data is flushed, the
protocol's connection_lost() method will (eventually) be
called with None as its argument.
"""
raise NotImplementedError
def set_protocol(self, protocol: DatagramProtocol):
"""Set a new protocol."""
raise NotImplementedError
def get_protocol(self) -> DatagramProtocol:
"""Return the current protocol."""
raise NotImplementedError