cengal.io.asock_io.versions.v_1.abstract

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18import socket
 19import errno
 20import copy
 21import enum
 22from contextlib import contextmanager
 23from collections import deque
 24from typing import Set, Iterable
 25
 26"""
 27Module Docstring
 28Docstrings: http://www.python.org/dev/peps/pep-0257/
 29"""
 30
 31__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 32__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 33__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 34__license__ = "Apache License, Version 2.0"
 35__version__ = "4.4.1"
 36__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 37__email__ = "gtalk@butenkoms.space"
 38# __status__ = "Prototype"
 39__status__ = "Development"
 40# __status__ = "Production"
 41
 42
 43class LoopIsAlreadyBegun(Exception):
 44    """
 45    You can not run NetIOUserApi.start() if it was already started (and still running).
 46    """
 47    pass
 48
 49
 50class WrongConnectionType(Exception):
 51    """
 52    You cannot run NetIOUserApi.make_connection() for ConnectionType.active_accepted connection. This kind of
 53    connections are made only from (and by) inside of IO loop and logic itself.
 54    """
 55    pass
 56
 57
 58class CanNotMakeConnection(Exception):
 59    """
 60    Currently not used. If there will be some exception on connect() call - it will be raised without any changes.
 61    """
 62    pass
 63
 64
 65# class ConnectionType(enum.Enum):
 66#     passive = 0  # passive socket (bind())
 67#     active_accepted = 1  # active accepted socket (accept())
 68#     active_connected = 2  # active connected socket (connect())
 69#
 70#
 71# class ConnectionState(enum.Enum):
 72#     not_connected_yet = 0  # socket is not in connection process
 73#     waiting_for_connection = 1  # socket is in connection process (async connection is delayed)
 74#     connected = 2  # socket is successfully connected
 75#     worker_fault = 3  # there was unhandled exception from one of the WorkerBase callbacks
 76#     io_fault = 4  # there was some IO trouble
 77#     waiting_for_disconnection = 5  # connection was marked as "should be closed" but was not closed yet
 78#     disconnected = 6  # socket is closed
 79
 80
 81class NonSocketConnectionSettings:
 82    pass
 83
 84
 85# class ConnectionDirectionRole:
 86#     server = 0
 87#     client = 1
 88
 89
 90class ConnectionType:
 91    passive = 0  # passive socket (bind())
 92    active_accepted = 1  # active accepted socket (accept())
 93    active_connected = 2  # active connected socket (connect())
 94
 95
 96class ConnectionState:
 97    not_connected_yet = 0  # socket is not in connection process
 98    waiting_for_connection = 1  # socket is in connection process (async connection is delayed)
 99    connected = 2  # socket is successfully connected
100    worker_fault = 3  # there was unhandled exception from one of the WorkerBase callbacks
101    io_fault = 4  # there was some IO trouble
102    waiting_for_disconnection = 5  # connection was marked as "should be closed" but was not closed yet
103    disconnected = 6  # socket is closed
104
105
106class ConnectionInfo:
107    def __init__(self,
108                 worker_obj,
109                 connection_type: ConnectionType,
110                 socket_address=None,
111                 socket_family=socket.AF_INET,
112                 socket_type=socket.SOCK_STREAM,
113                 socket_protocol=0,
114                 socket_fileno=None,
115                 backlog=0):
116        """
117        :param worker_obj: constructed worker object (see WorkerBase for more info). If this is a passive
118            connection - it (worker_obj) will be inherited by the descendant active_accepted connections
119            by copy.copy() call (see WorkerBase.__copy__() method for more info)
120        :param connection_type: see ConnectionType() description
121        :param socket_address:  see socket.bind()/socket.connect() docs
122        :param socket_family: see socket.socket() docs
123        :param socket_type: see socket.socket() docs
124        :param socket_protocol: see socket.socket() docs
125        :param socket_fileno: see socket.socket() docs
126        :param backlog: see socket.listen() docs
127        """
128        self.worker_obj = worker_obj
129        self.connection_type = connection_type
130        self.socket_address = socket_address
131        self.socket_family = socket_family
132        self.socket_type = socket_type
133        self.socket_protocol = socket_protocol
134        self.socket_fileno = socket_fileno
135        self.backlog = backlog
136
137
138class Connection:
139    """
140    Connection class. Usually created by IO loop or by IO API. But you can also create it by yourself
141    """
142    def __init__(self,
143                 connection_id,
144                 connection_info: ConnectionInfo,
145                 connection_and_address_pair: tuple,
146                 connection_state: ConnectionState,
147                 connection_name=None,
148                 ):
149        """
150        :param connection_id: unique connection identificator (unique within the IO object). You may use some random
151            GUID if you are creating connection by your self.
152        :param connection_info: new connection will be created using information provided in connection_info object.
153            See ConnectionInfo() for more information
154        :param connection_and_address_pair: (conn, address) tuple where conn is connected socket (or it can be socket
155            is in the process of connection. But only if it was made by IO loop.).
156        :param connection_state: see ConnectionState for more information
157        :param connection_name: name of the connection (if it was provided)
158        """
159        self.connection_id = connection_id
160        self.connection_info = connection_info
161        self.conn, self.address = connection_and_address_pair
162        self.connection_state = connection_state
163        self.connection_name = connection_name
164        self.worker_obj = connection_info.worker_obj
165        self.read_data = b''  # already read data
166        self.must_be_written_data = memoryview(b'')  # this data should be written
167        self.force_write_call = False
168
169    def add_must_be_written_data(self, data):
170        """
171        Use this method to add data to output buffers
172        :param data: some new output data to be send through this connection
173        :return:
174        """
175        self.must_be_written_data = memoryview(bytes(self.must_be_written_data) + data)
176
177
178class NetIOUserApi:
179    """
180    You may rely and use freely use methods of this base class from inside your program or from inside your worker
181    (WorkerBase).
182    """
183    def __init__(self):
184        super().__init__()
185        self.all_connections = set()
186        self.passive_connections = set()
187
188        self.connection_by_id = dict()
189        self.connection_by_name = dict()
190        self.connection_by_fileno = dict()
191
192    def start(self, destroy_on_finish=True):
193        """
194        Will start IO loop
195        :param destroy_on_finish: if True - destroy() will be called from inside of this method
196        :return:
197        """
198        raise NotImplementedError()
199
200    def stop(self):
201        """
202        Will initiate IO loop stop process
203        :return:
204        """
205        raise NotImplementedError()
206
207    def make_connection(self, connection_info: ConnectionInfo = None, name=None)->Connection:
208        """
209        Will create connection from given connection_info object. Than connection will be established. Immediate or
210        delayed - depends on the connection type:
211        - ConnectionType.passive - immediate;
212        - ConnectionType.active_connected - delayed.
213        In both cases WorkerBase.on_connect will be called immediately after connection will be successfully
214        established (IF it will be successfully established).
215        :param connection_info: new connection will be created using information provided in connection_info object.
216            See ConnectionInfo() for more information
217        :param name: name of the connection. If you'll provide it - you will be able to find this connection in
218            NetIOUserApi.connection_by_name dictionary by it's name.
219        :return:
220        """
221        raise NotImplementedError()
222
223    def add_connection(self, connection: Connection):
224        """
225        Will register already established connection. You need to use this method for example if you have already
226        connected socket
227        :param connection:
228        :return:
229        """
230        raise NotImplementedError()
231
232    def remove_connection(self, connection: Connection):
233        """
234        Will close and remove connection
235        :param connection:
236        :return:
237        """
238        raise NotImplementedError()
239
240    def check_is_connection_need_to_sent_data(self, connection: Connection):
241        """
242        Will check connection to output data presence. It is automatically called after EACH WorkerBase callback call
243        by the IO loop. But if you are filling other connection's output buffer - you'll need to make this call for that
244        connection by your self.
245        :param connection:
246        :return:
247        """
248        raise NotImplementedError()
249
250
251class NetIOCallbacks:
252    """
253    Callbacks from this class will be called from inside (and by) IOMethodBase loop.
254    """
255    def __init__(self):
256        super().__init__()
257
258    def on_accept_connection(self, connection: Connection):
259        raise NotImplementedError()
260
261    def on_connected(self, connection: Connection):
262        raise NotImplementedError()
263
264    def on_read(self, connection: Connection):
265        raise NotImplementedError()
266
267    def on_write(self, connection: Connection):
268        raise NotImplementedError()
269
270    def on_close(self, connection: Connection):
271        raise NotImplementedError()
272
273
274class NetIOBase(NetIOUserApi, NetIOCallbacks):
275    """
276    Base class for any IO implementation (Linux, BSD, Windows, cross platform, etc.).
277    """
278    def __init__(self, transport):
279        """
280
281        :param transport: class of the desired IOMethod. Instance (object) will be created by NetIOBase itself
282        """
283        super().__init__()
284        self.method = transport(self)
285
286    def destroy(self):
287        raise NotImplementedError()
288
289
290class WorkerBase:
291    """
292    Base class for all workers.
293    on_* callbacks will be called by the IO loop.
294
295    General info:
296    You can read input data from self.connection at any time (see "Caution" section of __init__ doc string) from any
297    callback.
298    You can write output data (to be send) to self.connection at any time (see "Caution") from any callback.
299    """
300    def __init__(self, api: NetIOUserApi=None, connection: Connection=None):
301        """
302        Caution:
303        Please do not rely on self.api and self.connection inside of your __init__ constructor: it is guaranteed that
304        they will be set before any callback call, but not at the construction process.
305
306        :param api: link to the constructed network io object
307        :param connection: link to the constructed connection object
308        """
309        self.api = api
310        self.connection = connection
311
312    def on_connect(self):
313        """
314        Will be called after connection successfully established
315        :return:
316        """
317        pass
318
319    def on_read(self):
320        """
321        Will be called if there is some NEW data in the connection input buffer
322        :return:
323        """
324        pass
325
326    def on_no_more_data_to_write(self):
327        """
328        Will be called after all data is sent.
329        Normally it will be called once (one single shot after each portion of out data is sent).
330        If you'll set self.connection.force_write_call to True state - this callback may be called continuously
331        (but not guaranteed: it depends of used IOMethod implementation)
332        :return:
333        """
334        pass
335
336    def on_connection_lost(self):
337        """
338        Will be called AFTER connection socket was actually closed and removed from IOMethod checking list.
339        At this time, self.connection.connection_state is already set to ConnectionState.disconnected.
340        :return:
341        """
342        pass
343
344    def __copy__(self):
345        """
346        This method SHOULD be implemented. It should create a new instance and copy some global (shared) data from
347        current object to that new instance. It will be called when new peer is connected to existing passive connection
348        So this is the way you may use to give all new connection some links to some global data by worker object of
349        the passive connection replication process.
350        :return:
351        """
352        raise NotImplementedError()
353
354
355class InlineWorkerBase:
356    __slots__ = ('client_id', 'keyword', 'socket_family', 'socket_type', 'socket_proto', 'addr_info', 'host_names',
357                 'is_in_raw_mode', '__set__is_in_raw_mode', '__set__mark_socket_as_should_be_closed_immediately',
358                 '__set__mark_socket_as_ready_to_be_closed', '__external_parameters_set_trigger', 'output_messages',
359                 '__hold__client_id')
360
361    def __init__(self, client_id=None, keyword: bytes = None, socket_family=None, socket_type=None, socket_proto=None,
362                 addr_info=None, host_names=None, external_parameters_set_trigger: Set = None):
363        """
364
365        :param keyword: client keyword. You may check for a known keywords to act appropriately
366        :param socket_family:
367        :param socket_type:
368        :param socket_proto:
369        :param addr_info: result of socket.getaddrinfo() call
370        :param host_names: result of socket.gethostbyaddr() call
371        """
372        self.client_id = client_id
373        self.keyword = keyword
374        self.socket_family = socket_family
375        self.socket_type = socket_type
376        self.socket_proto = socket_proto
377        self.addr_info = addr_info
378        self.host_names = host_names
379        self.is_in_raw_mode = None
380
381        # self.output_messages = FIFODequeWithLengthControl()
382        self.output_messages = deque()
383        # self.output_messages = list()
384
385    def on__data_received(self, data: bytes):
386        """
387        Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data
388        Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will
389        be logged
390        :param data: piece of input data if connection is in RAW-mode and full message otherwise.
391        """
392        pass
393
394    def on__output_buffers_are_empty(self):
395        """
396        Will be called immediately when all output data was send.
397        Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data
398        Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will
399        be logged
400        """
401        pass
402
403    def on__connection_lost(self):
404        """
405        Will be called after connection was closed. Current Inline Processor object will be destroyed after this call.
406        Situation with unhandled exception will be logged.
407        """
408        pass
409
410    def set__is_in_raw_mode(self, is_in_raw_mode: bool):
411        raise NotImplementedError()
412
413    def mark__socket_as_should_be_closed_immediately(self, mark_socket_as: bool):
414        raise NotImplementedError()
415
416    def mark__socket_as_ready_to_be_closed(self, mark_socket_as: bool):
417        raise NotImplementedError()
418
419    def __getstate__(self):
420        raise NotImplementedError()
421
422    def __setstate__(self, state):
423        raise NotImplementedError()
424
425
426@contextmanager
427def net_io(net_io_obj: NetIOBase):
428    """
429    Context manager.
430    Usage:
431
432    main_io = NetIOBase()
433    with(net_io(main_io)) as io:
434        print('Preparing connections')
435        connection1 = io.make_connection(...)
436        connection2 = io.make_connection(...)
437        k = c + 12
438        ...
439        connectionN = io.make_connection(...)
440        print('Starting IO loop')
441    print('IO loop was finished properly')
442
443
444    :param net_io_obj: constructed IO instance (object)
445    :return:
446    """
447    try:
448        yield net_io_obj
449        net_io_obj.start(destroy_on_finish=False)
450    finally:
451        net_io_obj.destroy()
452
453
454class IOLoopBase:
455    """
456    Base class for all IOMethod implementation (select, epoll, overlapped io, kqueue, etc.)
457    All his methods are called by the NetIOBase instance.
458    """
459    def __init__(self, interface: NetIOBase):
460        self.interface = interface
461        self.should_be_closed = set()
462        pass
463
464    def loop_iteration(self, timeout=-1):
465        """
466        Single IO loop iteration.
467        This method holds all IOMethod logic.
468        :param timeout: float or int. If timeout is negative, the call will block until there is an event.
469        :return:
470        """
471        raise NotImplementedError()
472
473    def destroy(self):
474        """
475        Initiates destruction process
476        :return:
477        """
478        raise NotImplementedError()
479
480    def set__can_read(self, conn: socket.socket, state=True):
481        """
482        Will allow (True) or disallow (False) "socket available to read" checks for socket
483        :param conn: target socket
484        :param state: True - allow; False - disallow
485        :return:
486        """
487        raise NotImplementedError()
488
489    def set__need_write(self, conn: socket.socket, state=True):
490        """
491        Will allow (True) or disallow (False) "socket available to write" checks for socket
492        :param conn: target socket
493        :param state: True - allow; False - disallow
494        :return:
495        """
496        raise NotImplementedError()
497
498    def set__should_be_closed(self, conn: socket.socket):
499        """
500        Mark socket as "should be closed"
501        :param conn: target socket
502        :return:
503        """
504        raise NotImplementedError()
505
506    def add_connection(self, conn: socket.socket):
507        """
508        Will add socket to the internal connections list
509        :param conn: target socket
510        :return:
511        """
512        raise NotImplementedError()
513
514    def remove_connection(self, conn: socket.socket):
515        """
516        Will remove socket from the internal connections list
517        :param conn: target socket
518        :return:
519        """
520        raise NotImplementedError()
class LoopIsAlreadyBegun(builtins.Exception):
44class LoopIsAlreadyBegun(Exception):
45    """
46    You can not run NetIOUserApi.start() if it was already started (and still running).
47    """
48    pass

You can not run NetIOUserApi.start() if it was already started (and still running).

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class WrongConnectionType(builtins.Exception):
51class WrongConnectionType(Exception):
52    """
53    You cannot run NetIOUserApi.make_connection() for ConnectionType.active_accepted connection. This kind of
54    connections are made only from (and by) inside of IO loop and logic itself.
55    """
56    pass

You cannot run NetIOUserApi.make_connection() for ConnectionType.active_accepted connection. This kind of connections are made only from (and by) inside of IO loop and logic itself.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class CanNotMakeConnection(builtins.Exception):
59class CanNotMakeConnection(Exception):
60    """
61    Currently not used. If there will be some exception on connect() call - it will be raised without any changes.
62    """
63    pass

Currently not used. If there will be some exception on connect() call - it will be raised without any changes.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class NonSocketConnectionSettings:
82class NonSocketConnectionSettings:
83    pass
class ConnectionType:
91class ConnectionType:
92    passive = 0  # passive socket (bind())
93    active_accepted = 1  # active accepted socket (accept())
94    active_connected = 2  # active connected socket (connect())
passive = 0
active_accepted = 1
active_connected = 2
class ConnectionState:
 97class ConnectionState:
 98    not_connected_yet = 0  # socket is not in connection process
 99    waiting_for_connection = 1  # socket is in connection process (async connection is delayed)
100    connected = 2  # socket is successfully connected
101    worker_fault = 3  # there was unhandled exception from one of the WorkerBase callbacks
102    io_fault = 4  # there was some IO trouble
103    waiting_for_disconnection = 5  # connection was marked as "should be closed" but was not closed yet
104    disconnected = 6  # socket is closed
not_connected_yet = 0
waiting_for_connection = 1
connected = 2
worker_fault = 3
io_fault = 4
waiting_for_disconnection = 5
disconnected = 6
class ConnectionInfo:
107class ConnectionInfo:
108    def __init__(self,
109                 worker_obj,
110                 connection_type: ConnectionType,
111                 socket_address=None,
112                 socket_family=socket.AF_INET,
113                 socket_type=socket.SOCK_STREAM,
114                 socket_protocol=0,
115                 socket_fileno=None,
116                 backlog=0):
117        """
118        :param worker_obj: constructed worker object (see WorkerBase for more info). If this is a passive
119            connection - it (worker_obj) will be inherited by the descendant active_accepted connections
120            by copy.copy() call (see WorkerBase.__copy__() method for more info)
121        :param connection_type: see ConnectionType() description
122        :param socket_address:  see socket.bind()/socket.connect() docs
123        :param socket_family: see socket.socket() docs
124        :param socket_type: see socket.socket() docs
125        :param socket_protocol: see socket.socket() docs
126        :param socket_fileno: see socket.socket() docs
127        :param backlog: see socket.listen() docs
128        """
129        self.worker_obj = worker_obj
130        self.connection_type = connection_type
131        self.socket_address = socket_address
132        self.socket_family = socket_family
133        self.socket_type = socket_type
134        self.socket_protocol = socket_protocol
135        self.socket_fileno = socket_fileno
136        self.backlog = backlog
ConnectionInfo( worker_obj, connection_type: ConnectionType, socket_address=None, socket_family=<AddressFamily.AF_INET: 2>, socket_type=<SocketKind.SOCK_STREAM: 1>, socket_protocol=0, socket_fileno=None, backlog=0)
108    def __init__(self,
109                 worker_obj,
110                 connection_type: ConnectionType,
111                 socket_address=None,
112                 socket_family=socket.AF_INET,
113                 socket_type=socket.SOCK_STREAM,
114                 socket_protocol=0,
115                 socket_fileno=None,
116                 backlog=0):
117        """
118        :param worker_obj: constructed worker object (see WorkerBase for more info). If this is a passive
119            connection - it (worker_obj) will be inherited by the descendant active_accepted connections
120            by copy.copy() call (see WorkerBase.__copy__() method for more info)
121        :param connection_type: see ConnectionType() description
122        :param socket_address:  see socket.bind()/socket.connect() docs
123        :param socket_family: see socket.socket() docs
124        :param socket_type: see socket.socket() docs
125        :param socket_protocol: see socket.socket() docs
126        :param socket_fileno: see socket.socket() docs
127        :param backlog: see socket.listen() docs
128        """
129        self.worker_obj = worker_obj
130        self.connection_type = connection_type
131        self.socket_address = socket_address
132        self.socket_family = socket_family
133        self.socket_type = socket_type
134        self.socket_protocol = socket_protocol
135        self.socket_fileno = socket_fileno
136        self.backlog = backlog

:param worker_obj: constructed worker object (see WorkerBase for more info). If this is a passive connection - it (worker_obj) will be inherited by the descendant active_accepted connections by copy.copy() call (see WorkerBase.__copy__() method for more info) :param connection_type: see ConnectionType() description :param socket_address: see socket.bind()/socket.connect() docs :param socket_family: see socket.socket() docs :param socket_type: see socket.socket() docs :param socket_protocol: see socket.socket() docs :param socket_fileno: see socket.socket() docs :param backlog: see socket.listen() docs

worker_obj
connection_type
socket_address
socket_family
socket_type
socket_protocol
socket_fileno
backlog
class Connection:
139class Connection:
140    """
141    Connection class. Usually created by IO loop or by IO API. But you can also create it by yourself
142    """
143    def __init__(self,
144                 connection_id,
145                 connection_info: ConnectionInfo,
146                 connection_and_address_pair: tuple,
147                 connection_state: ConnectionState,
148                 connection_name=None,
149                 ):
150        """
151        :param connection_id: unique connection identificator (unique within the IO object). You may use some random
152            GUID if you are creating connection by your self.
153        :param connection_info: new connection will be created using information provided in connection_info object.
154            See ConnectionInfo() for more information
155        :param connection_and_address_pair: (conn, address) tuple where conn is connected socket (or it can be socket
156            is in the process of connection. But only if it was made by IO loop.).
157        :param connection_state: see ConnectionState for more information
158        :param connection_name: name of the connection (if it was provided)
159        """
160        self.connection_id = connection_id
161        self.connection_info = connection_info
162        self.conn, self.address = connection_and_address_pair
163        self.connection_state = connection_state
164        self.connection_name = connection_name
165        self.worker_obj = connection_info.worker_obj
166        self.read_data = b''  # already read data
167        self.must_be_written_data = memoryview(b'')  # this data should be written
168        self.force_write_call = False
169
170    def add_must_be_written_data(self, data):
171        """
172        Use this method to add data to output buffers
173        :param data: some new output data to be send through this connection
174        :return:
175        """
176        self.must_be_written_data = memoryview(bytes(self.must_be_written_data) + data)

Connection class. Usually created by IO loop or by IO API. But you can also create it by yourself

Connection( connection_id, connection_info: ConnectionInfo, connection_and_address_pair: tuple, connection_state: ConnectionState, connection_name=None)
143    def __init__(self,
144                 connection_id,
145                 connection_info: ConnectionInfo,
146                 connection_and_address_pair: tuple,
147                 connection_state: ConnectionState,
148                 connection_name=None,
149                 ):
150        """
151        :param connection_id: unique connection identificator (unique within the IO object). You may use some random
152            GUID if you are creating connection by your self.
153        :param connection_info: new connection will be created using information provided in connection_info object.
154            See ConnectionInfo() for more information
155        :param connection_and_address_pair: (conn, address) tuple where conn is connected socket (or it can be socket
156            is in the process of connection. But only if it was made by IO loop.).
157        :param connection_state: see ConnectionState for more information
158        :param connection_name: name of the connection (if it was provided)
159        """
160        self.connection_id = connection_id
161        self.connection_info = connection_info
162        self.conn, self.address = connection_and_address_pair
163        self.connection_state = connection_state
164        self.connection_name = connection_name
165        self.worker_obj = connection_info.worker_obj
166        self.read_data = b''  # already read data
167        self.must_be_written_data = memoryview(b'')  # this data should be written
168        self.force_write_call = False

:param connection_id: unique connection identificator (unique within the IO object). You may use some random GUID if you are creating connection by your self. :param connection_info: new connection will be created using information provided in connection_info object. See ConnectionInfo() for more information :param connection_and_address_pair: (conn, address) tuple where conn is connected socket (or it can be socket is in the process of connection. But only if it was made by IO loop.). :param connection_state: see ConnectionState for more information :param connection_name: name of the connection (if it was provided)

connection_id
connection_info
connection_state
connection_name
worker_obj
read_data
must_be_written_data
force_write_call
def add_must_be_written_data(self, data):
170    def add_must_be_written_data(self, data):
171        """
172        Use this method to add data to output buffers
173        :param data: some new output data to be send through this connection
174        :return:
175        """
176        self.must_be_written_data = memoryview(bytes(self.must_be_written_data) + data)

Use this method to add data to output buffers :param data: some new output data to be send through this connection :return:

class NetIOUserApi:
179class NetIOUserApi:
180    """
181    You may rely and use freely use methods of this base class from inside your program or from inside your worker
182    (WorkerBase).
183    """
184    def __init__(self):
185        super().__init__()
186        self.all_connections = set()
187        self.passive_connections = set()
188
189        self.connection_by_id = dict()
190        self.connection_by_name = dict()
191        self.connection_by_fileno = dict()
192
193    def start(self, destroy_on_finish=True):
194        """
195        Will start IO loop
196        :param destroy_on_finish: if True - destroy() will be called from inside of this method
197        :return:
198        """
199        raise NotImplementedError()
200
201    def stop(self):
202        """
203        Will initiate IO loop stop process
204        :return:
205        """
206        raise NotImplementedError()
207
208    def make_connection(self, connection_info: ConnectionInfo = None, name=None)->Connection:
209        """
210        Will create connection from given connection_info object. Than connection will be established. Immediate or
211        delayed - depends on the connection type:
212        - ConnectionType.passive - immediate;
213        - ConnectionType.active_connected - delayed.
214        In both cases WorkerBase.on_connect will be called immediately after connection will be successfully
215        established (IF it will be successfully established).
216        :param connection_info: new connection will be created using information provided in connection_info object.
217            See ConnectionInfo() for more information
218        :param name: name of the connection. If you'll provide it - you will be able to find this connection in
219            NetIOUserApi.connection_by_name dictionary by it's name.
220        :return:
221        """
222        raise NotImplementedError()
223
224    def add_connection(self, connection: Connection):
225        """
226        Will register already established connection. You need to use this method for example if you have already
227        connected socket
228        :param connection:
229        :return:
230        """
231        raise NotImplementedError()
232
233    def remove_connection(self, connection: Connection):
234        """
235        Will close and remove connection
236        :param connection:
237        :return:
238        """
239        raise NotImplementedError()
240
241    def check_is_connection_need_to_sent_data(self, connection: Connection):
242        """
243        Will check connection to output data presence. It is automatically called after EACH WorkerBase callback call
244        by the IO loop. But if you are filling other connection's output buffer - you'll need to make this call for that
245        connection by your self.
246        :param connection:
247        :return:
248        """
249        raise NotImplementedError()

You may rely and use freely use methods of this base class from inside your program or from inside your worker (WorkerBase).

all_connections
passive_connections
connection_by_id
connection_by_name
connection_by_fileno
def start(self, destroy_on_finish=True):
193    def start(self, destroy_on_finish=True):
194        """
195        Will start IO loop
196        :param destroy_on_finish: if True - destroy() will be called from inside of this method
197        :return:
198        """
199        raise NotImplementedError()

Will start IO loop :param destroy_on_finish: if True - destroy() will be called from inside of this method :return:

def stop(self):
201    def stop(self):
202        """
203        Will initiate IO loop stop process
204        :return:
205        """
206        raise NotImplementedError()

Will initiate IO loop stop process :return:

def make_connection( self, connection_info: ConnectionInfo = None, name=None) -> Connection:
208    def make_connection(self, connection_info: ConnectionInfo = None, name=None)->Connection:
209        """
210        Will create connection from given connection_info object. Than connection will be established. Immediate or
211        delayed - depends on the connection type:
212        - ConnectionType.passive - immediate;
213        - ConnectionType.active_connected - delayed.
214        In both cases WorkerBase.on_connect will be called immediately after connection will be successfully
215        established (IF it will be successfully established).
216        :param connection_info: new connection will be created using information provided in connection_info object.
217            See ConnectionInfo() for more information
218        :param name: name of the connection. If you'll provide it - you will be able to find this connection in
219            NetIOUserApi.connection_by_name dictionary by it's name.
220        :return:
221        """
222        raise NotImplementedError()

Will create connection from given connection_info object. Than connection will be established. Immediate or delayed - depends on the connection type:

  • ConnectionType.passive - immediate;
  • ConnectionType.active_connected - delayed. In both cases WorkerBase.on_connect will be called immediately after connection will be successfully established (IF it will be successfully established). :param connection_info: new connection will be created using information provided in connection_info object. See ConnectionInfo() for more information :param name: name of the connection. If you'll provide it - you will be able to find this connection in NetIOUserApi.connection_by_name dictionary by it's name. :return:
def add_connection( self, connection: Connection):
224    def add_connection(self, connection: Connection):
225        """
226        Will register already established connection. You need to use this method for example if you have already
227        connected socket
228        :param connection:
229        :return:
230        """
231        raise NotImplementedError()

Will register already established connection. You need to use this method for example if you have already connected socket :param connection: :return:

def remove_connection( self, connection: Connection):
233    def remove_connection(self, connection: Connection):
234        """
235        Will close and remove connection
236        :param connection:
237        :return:
238        """
239        raise NotImplementedError()

Will close and remove connection :param connection: :return:

def check_is_connection_need_to_sent_data( self, connection: Connection):
241    def check_is_connection_need_to_sent_data(self, connection: Connection):
242        """
243        Will check connection to output data presence. It is automatically called after EACH WorkerBase callback call
244        by the IO loop. But if you are filling other connection's output buffer - you'll need to make this call for that
245        connection by your self.
246        :param connection:
247        :return:
248        """
249        raise NotImplementedError()

Will check connection to output data presence. It is automatically called after EACH WorkerBase callback call by the IO loop. But if you are filling other connection's output buffer - you'll need to make this call for that connection by your self. :param connection: :return:

class NetIOCallbacks:
252class NetIOCallbacks:
253    """
254    Callbacks from this class will be called from inside (and by) IOMethodBase loop.
255    """
256    def __init__(self):
257        super().__init__()
258
259    def on_accept_connection(self, connection: Connection):
260        raise NotImplementedError()
261
262    def on_connected(self, connection: Connection):
263        raise NotImplementedError()
264
265    def on_read(self, connection: Connection):
266        raise NotImplementedError()
267
268    def on_write(self, connection: Connection):
269        raise NotImplementedError()
270
271    def on_close(self, connection: Connection):
272        raise NotImplementedError()

Callbacks from this class will be called from inside (and by) IOMethodBase loop.

def on_accept_connection( self, connection: Connection):
259    def on_accept_connection(self, connection: Connection):
260        raise NotImplementedError()
def on_connected( self, connection: Connection):
262    def on_connected(self, connection: Connection):
263        raise NotImplementedError()
def on_read( self, connection: Connection):
265    def on_read(self, connection: Connection):
266        raise NotImplementedError()
def on_write( self, connection: Connection):
268    def on_write(self, connection: Connection):
269        raise NotImplementedError()
def on_close( self, connection: Connection):
271    def on_close(self, connection: Connection):
272        raise NotImplementedError()
class NetIOBase(NetIOUserApi, NetIOCallbacks):
275class NetIOBase(NetIOUserApi, NetIOCallbacks):
276    """
277    Base class for any IO implementation (Linux, BSD, Windows, cross platform, etc.).
278    """
279    def __init__(self, transport):
280        """
281
282        :param transport: class of the desired IOMethod. Instance (object) will be created by NetIOBase itself
283        """
284        super().__init__()
285        self.method = transport(self)
286
287    def destroy(self):
288        raise NotImplementedError()

Base class for any IO implementation (Linux, BSD, Windows, cross platform, etc.).

NetIOBase(transport)
279    def __init__(self, transport):
280        """
281
282        :param transport: class of the desired IOMethod. Instance (object) will be created by NetIOBase itself
283        """
284        super().__init__()
285        self.method = transport(self)

:param transport: class of the desired IOMethod. Instance (object) will be created by NetIOBase itself

method
def destroy(self):
287    def destroy(self):
288        raise NotImplementedError()
class WorkerBase:
291class WorkerBase:
292    """
293    Base class for all workers.
294    on_* callbacks will be called by the IO loop.
295
296    General info:
297    You can read input data from self.connection at any time (see "Caution" section of __init__ doc string) from any
298    callback.
299    You can write output data (to be send) to self.connection at any time (see "Caution") from any callback.
300    """
301    def __init__(self, api: NetIOUserApi=None, connection: Connection=None):
302        """
303        Caution:
304        Please do not rely on self.api and self.connection inside of your __init__ constructor: it is guaranteed that
305        they will be set before any callback call, but not at the construction process.
306
307        :param api: link to the constructed network io object
308        :param connection: link to the constructed connection object
309        """
310        self.api = api
311        self.connection = connection
312
313    def on_connect(self):
314        """
315        Will be called after connection successfully established
316        :return:
317        """
318        pass
319
320    def on_read(self):
321        """
322        Will be called if there is some NEW data in the connection input buffer
323        :return:
324        """
325        pass
326
327    def on_no_more_data_to_write(self):
328        """
329        Will be called after all data is sent.
330        Normally it will be called once (one single shot after each portion of out data is sent).
331        If you'll set self.connection.force_write_call to True state - this callback may be called continuously
332        (but not guaranteed: it depends of used IOMethod implementation)
333        :return:
334        """
335        pass
336
337    def on_connection_lost(self):
338        """
339        Will be called AFTER connection socket was actually closed and removed from IOMethod checking list.
340        At this time, self.connection.connection_state is already set to ConnectionState.disconnected.
341        :return:
342        """
343        pass
344
345    def __copy__(self):
346        """
347        This method SHOULD be implemented. It should create a new instance and copy some global (shared) data from
348        current object to that new instance. It will be called when new peer is connected to existing passive connection
349        So this is the way you may use to give all new connection some links to some global data by worker object of
350        the passive connection replication process.
351        :return:
352        """
353        raise NotImplementedError()

Base class for all workers. on_* callbacks will be called by the IO loop.

General info: You can read input data from self.connection at any time (see "Caution" section of __init__ doc string) from any callback. You can write output data (to be send) to self.connection at any time (see "Caution") from any callback.

WorkerBase( api: NetIOUserApi = None, connection: Connection = None)
301    def __init__(self, api: NetIOUserApi=None, connection: Connection=None):
302        """
303        Caution:
304        Please do not rely on self.api and self.connection inside of your __init__ constructor: it is guaranteed that
305        they will be set before any callback call, but not at the construction process.
306
307        :param api: link to the constructed network io object
308        :param connection: link to the constructed connection object
309        """
310        self.api = api
311        self.connection = connection

Caution: Please do not rely on self.api and self.connection inside of your __init__ constructor: it is guaranteed that they will be set before any callback call, but not at the construction process.

:param api: link to the constructed network io object :param connection: link to the constructed connection object

api
connection
def on_connect(self):
313    def on_connect(self):
314        """
315        Will be called after connection successfully established
316        :return:
317        """
318        pass

Will be called after connection successfully established :return:

def on_read(self):
320    def on_read(self):
321        """
322        Will be called if there is some NEW data in the connection input buffer
323        :return:
324        """
325        pass

Will be called if there is some NEW data in the connection input buffer :return:

def on_no_more_data_to_write(self):
327    def on_no_more_data_to_write(self):
328        """
329        Will be called after all data is sent.
330        Normally it will be called once (one single shot after each portion of out data is sent).
331        If you'll set self.connection.force_write_call to True state - this callback may be called continuously
332        (but not guaranteed: it depends of used IOMethod implementation)
333        :return:
334        """
335        pass

Will be called after all data is sent. Normally it will be called once (one single shot after each portion of out data is sent). If you'll set self.connection.force_write_call to True state - this callback may be called continuously (but not guaranteed: it depends of used IOMethod implementation) :return:

def on_connection_lost(self):
337    def on_connection_lost(self):
338        """
339        Will be called AFTER connection socket was actually closed and removed from IOMethod checking list.
340        At this time, self.connection.connection_state is already set to ConnectionState.disconnected.
341        :return:
342        """
343        pass

Will be called AFTER connection socket was actually closed and removed from IOMethod checking list. At this time, self.connection.connection_state is already set to ConnectionState.disconnected. :return:

class InlineWorkerBase:
356class InlineWorkerBase:
357    __slots__ = ('client_id', 'keyword', 'socket_family', 'socket_type', 'socket_proto', 'addr_info', 'host_names',
358                 'is_in_raw_mode', '__set__is_in_raw_mode', '__set__mark_socket_as_should_be_closed_immediately',
359                 '__set__mark_socket_as_ready_to_be_closed', '__external_parameters_set_trigger', 'output_messages',
360                 '__hold__client_id')
361
362    def __init__(self, client_id=None, keyword: bytes = None, socket_family=None, socket_type=None, socket_proto=None,
363                 addr_info=None, host_names=None, external_parameters_set_trigger: Set = None):
364        """
365
366        :param keyword: client keyword. You may check for a known keywords to act appropriately
367        :param socket_family:
368        :param socket_type:
369        :param socket_proto:
370        :param addr_info: result of socket.getaddrinfo() call
371        :param host_names: result of socket.gethostbyaddr() call
372        """
373        self.client_id = client_id
374        self.keyword = keyword
375        self.socket_family = socket_family
376        self.socket_type = socket_type
377        self.socket_proto = socket_proto
378        self.addr_info = addr_info
379        self.host_names = host_names
380        self.is_in_raw_mode = None
381
382        # self.output_messages = FIFODequeWithLengthControl()
383        self.output_messages = deque()
384        # self.output_messages = list()
385
386    def on__data_received(self, data: bytes):
387        """
388        Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data
389        Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will
390        be logged
391        :param data: piece of input data if connection is in RAW-mode and full message otherwise.
392        """
393        pass
394
395    def on__output_buffers_are_empty(self):
396        """
397        Will be called immediately when all output data was send.
398        Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data
399        Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will
400        be logged
401        """
402        pass
403
404    def on__connection_lost(self):
405        """
406        Will be called after connection was closed. Current Inline Processor object will be destroyed after this call.
407        Situation with unhandled exception will be logged.
408        """
409        pass
410
411    def set__is_in_raw_mode(self, is_in_raw_mode: bool):
412        raise NotImplementedError()
413
414    def mark__socket_as_should_be_closed_immediately(self, mark_socket_as: bool):
415        raise NotImplementedError()
416
417    def mark__socket_as_ready_to_be_closed(self, mark_socket_as: bool):
418        raise NotImplementedError()
419
420    def __getstate__(self):
421        raise NotImplementedError()
422
423    def __setstate__(self, state):
424        raise NotImplementedError()
InlineWorkerBase( client_id=None, keyword: bytes = None, socket_family=None, socket_type=None, socket_proto=None, addr_info=None, host_names=None, external_parameters_set_trigger: Set = None)
362    def __init__(self, client_id=None, keyword: bytes = None, socket_family=None, socket_type=None, socket_proto=None,
363                 addr_info=None, host_names=None, external_parameters_set_trigger: Set = None):
364        """
365
366        :param keyword: client keyword. You may check for a known keywords to act appropriately
367        :param socket_family:
368        :param socket_type:
369        :param socket_proto:
370        :param addr_info: result of socket.getaddrinfo() call
371        :param host_names: result of socket.gethostbyaddr() call
372        """
373        self.client_id = client_id
374        self.keyword = keyword
375        self.socket_family = socket_family
376        self.socket_type = socket_type
377        self.socket_proto = socket_proto
378        self.addr_info = addr_info
379        self.host_names = host_names
380        self.is_in_raw_mode = None
381
382        # self.output_messages = FIFODequeWithLengthControl()
383        self.output_messages = deque()
384        # self.output_messages = list()

:param keyword: client keyword. You may check for a known keywords to act appropriately :param socket_family: :param socket_type: :param socket_proto: :param addr_info: result of socket.getaddrinfo() call :param host_names: result of socket.gethostbyaddr() call

client_id
keyword
socket_family
socket_type
socket_proto
addr_info
host_names
is_in_raw_mode
output_messages
def on__data_received(self, data: bytes):
386    def on__data_received(self, data: bytes):
387        """
388        Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data
389        Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will
390        be logged
391        :param data: piece of input data if connection is in RAW-mode and full message otherwise.
392        """
393        pass

Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will be logged :param data: piece of input data if connection is in RAW-mode and full message otherwise.

def on__output_buffers_are_empty(self):
395    def on__output_buffers_are_empty(self):
396        """
397        Will be called immediately when all output data was send.
398        Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data
399        Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will
400        be logged
401        """
402        pass

Will be called immediately when all output data was send. Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will be logged

def on__connection_lost(self):
404    def on__connection_lost(self):
405        """
406        Will be called after connection was closed. Current Inline Processor object will be destroyed after this call.
407        Situation with unhandled exception will be logged.
408        """
409        pass

Will be called after connection was closed. Current Inline Processor object will be destroyed after this call. Situation with unhandled exception will be logged.

def set__is_in_raw_mode(self, is_in_raw_mode: bool):
411    def set__is_in_raw_mode(self, is_in_raw_mode: bool):
412        raise NotImplementedError()
def mark__socket_as_should_be_closed_immediately(self, mark_socket_as: bool):
414    def mark__socket_as_should_be_closed_immediately(self, mark_socket_as: bool):
415        raise NotImplementedError()
def mark__socket_as_ready_to_be_closed(self, mark_socket_as: bool):
417    def mark__socket_as_ready_to_be_closed(self, mark_socket_as: bool):
418        raise NotImplementedError()
@contextmanager
def net_io(net_io_obj: NetIOBase):
427@contextmanager
428def net_io(net_io_obj: NetIOBase):
429    """
430    Context manager.
431    Usage:
432
433    main_io = NetIOBase()
434    with(net_io(main_io)) as io:
435        print('Preparing connections')
436        connection1 = io.make_connection(...)
437        connection2 = io.make_connection(...)
438        k = c + 12
439        ...
440        connectionN = io.make_connection(...)
441        print('Starting IO loop')
442    print('IO loop was finished properly')
443
444
445    :param net_io_obj: constructed IO instance (object)
446    :return:
447    """
448    try:
449        yield net_io_obj
450        net_io_obj.start(destroy_on_finish=False)
451    finally:
452        net_io_obj.destroy()

Context manager. Usage:

main_io = NetIOBase() with(net_io(main_io)) as io: print('Preparing connections') connection1 = io.make_connection(...) connection2 = io.make_connection(...) k = c + 12 ... connectionN = io.make_connection(...) print('Starting IO loop') print('IO loop was finished properly')

:param net_io_obj: constructed IO instance (object) :return:

class IOLoopBase:
455class IOLoopBase:
456    """
457    Base class for all IOMethod implementation (select, epoll, overlapped io, kqueue, etc.)
458    All his methods are called by the NetIOBase instance.
459    """
460    def __init__(self, interface: NetIOBase):
461        self.interface = interface
462        self.should_be_closed = set()
463        pass
464
465    def loop_iteration(self, timeout=-1):
466        """
467        Single IO loop iteration.
468        This method holds all IOMethod logic.
469        :param timeout: float or int. If timeout is negative, the call will block until there is an event.
470        :return:
471        """
472        raise NotImplementedError()
473
474    def destroy(self):
475        """
476        Initiates destruction process
477        :return:
478        """
479        raise NotImplementedError()
480
481    def set__can_read(self, conn: socket.socket, state=True):
482        """
483        Will allow (True) or disallow (False) "socket available to read" checks for socket
484        :param conn: target socket
485        :param state: True - allow; False - disallow
486        :return:
487        """
488        raise NotImplementedError()
489
490    def set__need_write(self, conn: socket.socket, state=True):
491        """
492        Will allow (True) or disallow (False) "socket available to write" checks for socket
493        :param conn: target socket
494        :param state: True - allow; False - disallow
495        :return:
496        """
497        raise NotImplementedError()
498
499    def set__should_be_closed(self, conn: socket.socket):
500        """
501        Mark socket as "should be closed"
502        :param conn: target socket
503        :return:
504        """
505        raise NotImplementedError()
506
507    def add_connection(self, conn: socket.socket):
508        """
509        Will add socket to the internal connections list
510        :param conn: target socket
511        :return:
512        """
513        raise NotImplementedError()
514
515    def remove_connection(self, conn: socket.socket):
516        """
517        Will remove socket from the internal connections list
518        :param conn: target socket
519        :return:
520        """
521        raise NotImplementedError()

Base class for all IOMethod implementation (select, epoll, overlapped io, kqueue, etc.) All his methods are called by the NetIOBase instance.

IOLoopBase(interface: NetIOBase)
460    def __init__(self, interface: NetIOBase):
461        self.interface = interface
462        self.should_be_closed = set()
463        pass
interface
should_be_closed
def loop_iteration(self, timeout=-1):
465    def loop_iteration(self, timeout=-1):
466        """
467        Single IO loop iteration.
468        This method holds all IOMethod logic.
469        :param timeout: float or int. If timeout is negative, the call will block until there is an event.
470        :return:
471        """
472        raise NotImplementedError()

Single IO loop iteration. This method holds all IOMethod logic. :param timeout: float or int. If timeout is negative, the call will block until there is an event. :return:

def destroy(self):
474    def destroy(self):
475        """
476        Initiates destruction process
477        :return:
478        """
479        raise NotImplementedError()

Initiates destruction process :return:

def set__can_read(self, conn: socket.socket, state=True):
481    def set__can_read(self, conn: socket.socket, state=True):
482        """
483        Will allow (True) or disallow (False) "socket available to read" checks for socket
484        :param conn: target socket
485        :param state: True - allow; False - disallow
486        :return:
487        """
488        raise NotImplementedError()

Will allow (True) or disallow (False) "socket available to read" checks for socket :param conn: target socket :param state: True - allow; False - disallow :return:

def set__need_write(self, conn: socket.socket, state=True):
490    def set__need_write(self, conn: socket.socket, state=True):
491        """
492        Will allow (True) or disallow (False) "socket available to write" checks for socket
493        :param conn: target socket
494        :param state: True - allow; False - disallow
495        :return:
496        """
497        raise NotImplementedError()

Will allow (True) or disallow (False) "socket available to write" checks for socket :param conn: target socket :param state: True - allow; False - disallow :return:

def set__should_be_closed(self, conn: socket.socket):
499    def set__should_be_closed(self, conn: socket.socket):
500        """
501        Mark socket as "should be closed"
502        :param conn: target socket
503        :return:
504        """
505        raise NotImplementedError()

Mark socket as "should be closed" :param conn: target socket :return:

def add_connection(self, conn: socket.socket):
507    def add_connection(self, conn: socket.socket):
508        """
509        Will add socket to the internal connections list
510        :param conn: target socket
511        :return:
512        """
513        raise NotImplementedError()

Will add socket to the internal connections list :param conn: target socket :return:

def remove_connection(self, conn: socket.socket):
515    def remove_connection(self, conn: socket.socket):
516        """
517        Will remove socket from the internal connections list
518        :param conn: target socket
519        :return:
520        """
521        raise NotImplementedError()

Will remove socket from the internal connections list :param conn: target socket :return: