cengal.io.net_io.versions.v_0.net_io_abstract
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18import socket 19import errno 20import copy 21import enum 22from contextlib import contextmanager 23 24""" 25Module Docstring 26Docstrings: http://www.python.org/dev/peps/pep-0257/ 27""" 28 29__author__ = "ButenkoMS <gtalk@butenkoms.space>" 30__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 31__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 32__license__ = "Apache License, Version 2.0" 33__version__ = "4.4.1" 34__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 35__email__ = "gtalk@butenkoms.space" 36# __status__ = "Prototype" 37__status__ = "Development" 38# __status__ = "Production" 39 40 41class LoopIsAlreadyBegun(Exception): 42 """ 43 You can not run NetIOUserApi.start() if it was already started (and still running). 44 """ 45 pass 46 47 48class WrongConnectionType(Exception): 49 """ 50 You cannot run NetIOUserApi.make_connection() for ConnectionType.active_accepted connection. This kind of 51 connections are made only from (and by) inside of IO loop and logic itself. 52 """ 53 pass 54 55 56class CanNotMakeConnection(Exception): 57 """ 58 Currently not used. If there will be some exception on connect() call - it will be raised without any changes. 59 """ 60 pass 61 62 63class ConnectionType(enum.Enum): 64 passive = 0 # passive socket (bind()) 65 active_accepted = 1 # active accepted socket (accept()) 66 active_connected = 2 # active connected socket (connect()) 67 68 69class ConnectionState(enum.Enum): 70 not_connected_yet = 0 # socket is not in connection process 71 waiting_for_connection = 1 # socket is in connection process (async connection is delayed) 72 connected = 2 # socket is successfully connected 73 worker_fault = 3 # there was unhandled exception from one of the WorkerBase callbacks 74 io_fault = 4 # there was some IO trouble 75 waiting_for_disconnection = 5 # connection was marked as "should be closed" but was not closed yet 76 disconnected = 6 # socket is closed 77 78 79class ConnectionInfo: 80 def __init__(self, 81 worker_obj, 82 connection_type: ConnectionType, 83 socket_address=None, 84 socket_family=socket.AF_INET, 85 socket_type=socket.SOCK_STREAM, 86 socket_protocol=0, 87 socket_fileno=None, 88 backlog=0): 89 """ 90 :param worker_obj: constructed worker object (see WorkerBase for more info). If this is a passive 91 connection - it (worker_obj) will be inherited by the descendant active_accepted connections 92 by copy.copy() call (see WorkerBase.__copy__() method for more info) 93 :param connection_type: see ConnectionType() description 94 :param socket_address: see socket.bind()/socket.connect() docs 95 :param socket_family: see socket.socket() docs 96 :param socket_type: see socket.socket() docs 97 :param socket_protocol: see socket.socket() docs 98 :param socket_fileno: see socket.socket() docs 99 :param backlog: see socket.listen() docs 100 """ 101 self.worker_obj = worker_obj 102 self.connection_type = connection_type 103 self.socket_address = socket_address 104 self.socket_family = socket_family 105 self.socket_type = socket_type 106 self.socket_protocol = socket_protocol 107 self.socket_fileno = socket_fileno 108 self.backlog = backlog 109 110 111class Connection: 112 """ 113 Connection class. Usually created by IO loop or by IO API. But you can also create it by yourself 114 """ 115 def __init__(self, 116 connection_id, 117 connection_info: ConnectionInfo, 118 connection_and_address_pair: tuple, 119 connection_state: ConnectionState, 120 connection_name=None, 121 ): 122 """ 123 :param connection_id: unique connection identificator (unique within the IO object). You may use some random 124 GUID if you are creating connection by your self. 125 :param connection_info: new connection will be created using information provided in connection_info object. 126 See ConnectionInfo() for more information 127 :param connection_and_address_pair: (conn, address) tuple where conn is connected socket (or it can be socket 128 is in the process of connection. But only if it was made by IO loop.). 129 :param connection_state: see ConnectionState for more information 130 :param connection_name: name of the connection (if it was provided) 131 """ 132 self.connection_id = connection_id 133 self.connection_info = connection_info 134 self.conn, self.address = connection_and_address_pair 135 self.connection_state = connection_state 136 self.connection_name = connection_name 137 self.worker_obj = connection_info.worker_obj 138 self.read_data = b'' # already read data 139 self.must_be_written_data = memoryview(b'') # this data should be written 140 self.force_write_call = False 141 142 def add_must_be_written_data(self, data): 143 """ 144 Use this method to add data to output buffers 145 :param data: some new output data to be send through this connection 146 :return: 147 """ 148 self.must_be_written_data = memoryview(bytes(self.must_be_written_data) + data) 149 150 151class NetIOUserApi: 152 """ 153 You may rely and use freely use methods of this base class from inside your program or from inside your worker 154 (WorkerBase). 155 """ 156 def __init__(self): 157 super().__init__() 158 self.all_connections = set() 159 self.passive_connections = set() 160 161 self.connection_by_id = dict() 162 self.connection_by_name = dict() 163 self.connection_by_fileno = dict() 164 165 def start(self, destroy_on_finish=True): 166 """ 167 Will start IO loop 168 :param destroy_on_finish: if True - destroy() will be called from inside of this method 169 :return: 170 """ 171 raise NotImplementedError() 172 173 def stop(self): 174 """ 175 Will initiate IO loop stop process 176 :return: 177 """ 178 raise NotImplementedError() 179 180 def make_connection(self, connection_info: ConnectionInfo = None, name=None)->Connection: 181 """ 182 Will create connection from given connection_info object. Than connection will be established. Immediate or 183 delayed - depends on the connection type: 184 - ConnectionType.passive - immediate; 185 - ConnectionType.active_connected - delayed. 186 In both cases WorkerBase.on_connect will be called immediately after connection will be successfully 187 established (IF it will be successfully established). 188 :param connection_info: new connection will be created using information provided in connection_info object. 189 See ConnectionInfo() for more information 190 :param name: name of the connection. If you'll provide it - you will be able to find this connection in 191 NetIOUserApi.connection_by_name dictionary by it's name. 192 :return: 193 """ 194 raise NotImplementedError() 195 196 def add_connection(self, connection: Connection): 197 """ 198 Will register already established connection. You need to use this method for example if you have already 199 connected socket 200 :param connection: 201 :return: 202 """ 203 raise NotImplementedError() 204 205 def remove_connection(self, connection: Connection): 206 """ 207 Will close and remove connection 208 :param connection: 209 :return: 210 """ 211 raise NotImplementedError() 212 213 def check_is_connection_need_to_sent_data(self, connection: Connection): 214 """ 215 Will check connection to output data presence. It is automatically called after EACH WorkerBase callback call 216 by the IO loop. But if you are filling other connection's output buffer - you'll need to make this call for that 217 connection by your self. 218 :param connection: 219 :return: 220 """ 221 raise NotImplementedError() 222 223 224class NetIOCallbacks: 225 """ 226 Callbacks from this class will be called from inside (and by) IOMethodBase loop. 227 """ 228 def __init__(self): 229 super().__init__() 230 231 def on_accept_connection(self, connection: Connection): 232 raise NotImplementedError() 233 234 def on_connected(self, connection: Connection): 235 raise NotImplementedError() 236 237 def on_read(self, connection: Connection): 238 raise NotImplementedError() 239 240 def on_write(self, connection: Connection): 241 raise NotImplementedError() 242 243 def on_close(self, connection: Connection): 244 raise NotImplementedError() 245 246 247class NetIOBase(NetIOUserApi, NetIOCallbacks): 248 """ 249 Base class for any IO implementation (Linux, BSD, Windows, cross platform, etc.). 250 """ 251 def __init__(self, transport): 252 """ 253 254 :param transport: class of the desired IOMethod. Instance (object) will be created by NetIOBase itself 255 """ 256 super().__init__() 257 self.method = transport(self) 258 259 def destroy(self): 260 raise NotImplementedError() 261 262 263class WorkerBase: 264 """ 265 Base class for all workers. 266 on_* callbacks will be called by the IO loop. 267 268 General info: 269 You can read input data from self.connection at any time (see "Caution" section of __init__ doc string) from any 270 callback. 271 You can write output data (to be send) to self.connection at any time (see "Caution") from any callback. 272 """ 273 def __init__(self, api: NetIOUserApi=None, connection: Connection=None): 274 """ 275 Caution: 276 Please do not rely on self.api and self.connection inside of your __init__ constructor: it is guaranteed that 277 they will be set before any callback call, but not at the construction process. 278 279 :param api: link to the constructed network io object 280 :param connection: link to the constructed connection object 281 """ 282 self.api = api 283 self.connection = connection 284 285 def on_connect(self): 286 """ 287 Will be called after connection successfully established 288 :return: 289 """ 290 pass 291 292 def on_read(self): 293 """ 294 Will be called if there is some NEW data in the connection input buffer 295 :return: 296 """ 297 pass 298 299 def on_no_more_data_to_write(self): 300 """ 301 Will be called after all data is sent. 302 Normally it will be called once (one single shot after each portion of out data is sent). 303 If you'll set self.connection.force_write_call to True state - this callback may be called continuously 304 (but not guaranteed: it depends of used IOMethod implementation) 305 :return: 306 """ 307 pass 308 309 def on_connection_lost(self): 310 """ 311 Will be called AFTER connection socket was actually closed and removed from IOMethod checking list. 312 At this time, self.connection.connection_state is already set to ConnectionState.disconnected. 313 :return: 314 """ 315 pass 316 317 def __copy__(self): 318 """ 319 This method SHOULD be implemented. It should create a new instance and copy some global (shared) data from 320 current object to that new instance. It will be called when new peer is connected to existing passive connection 321 So this is the way you may use to give all new connection some links to some global data by worker object of 322 the passive connection replication process. 323 :return: 324 """ 325 raise NotImplementedError() 326 327 328@contextmanager 329def net_io(net_io_obj: NetIOBase): 330 """ 331 Context manager. 332 Usage: 333 334 main_io = NetIOBase() 335 with(net_io(main_io)) as io: 336 print('Preparing connections') 337 connection1 = io.make_connection(...) 338 connection2 = io.make_connection(...) 339 k = c + 12 340 ... 341 connectionN = io.make_connection(...) 342 print('Starting IO loop') 343 print('IO loop was finished properly') 344 345 346 :param net_io_obj: constructed IO instance (object) 347 :return: 348 """ 349 try: 350 yield net_io_obj 351 net_io_obj.start(destroy_on_finish=False) 352 finally: 353 net_io_obj.destroy() 354 355 356class IOMethodBase: 357 """ 358 Base class for all IOMethod implementation (select, epoll, overlapped io, kqueue, etc.) 359 All his methods are called by the NetIOBase instance. 360 """ 361 def __init__(self, interface: NetIOBase): 362 self.interface = interface 363 self.should_be_closed = set() 364 pass 365 366 def loop_iteration(self, timeout=None): 367 """ 368 Single IO loop iteration. 369 This method holds all IOMethod logic. 370 :return: 371 """ 372 raise NotImplementedError() 373 374 def destroy(self): 375 """ 376 Initiates destruction process 377 :return: 378 """ 379 raise NotImplementedError() 380 381 def set__can_read(self, conn: socket.socket, state=True): 382 """ 383 Will allow (True) or disallow (False) "socket available to read" checks for socket 384 :param conn: target socket 385 :param state: True - allow; False - disallow 386 :return: 387 """ 388 raise NotImplementedError() 389 390 def set__need_write(self, conn: socket.socket, state=True): 391 """ 392 Will allow (True) or disallow (False) "socket available to write" checks for socket 393 :param conn: target socket 394 :param state: True - allow; False - disallow 395 :return: 396 """ 397 raise NotImplementedError() 398 399 def set__should_be_closed(self, conn: socket.socket): 400 """ 401 Mark socket as "should be closed" 402 :param conn: target socket 403 :return: 404 """ 405 raise NotImplementedError() 406 407 def add_connection(self, conn: socket.socket): 408 """ 409 Will add socket to the internal connections list 410 :param conn: target socket 411 :return: 412 """ 413 raise NotImplementedError() 414 415 def remove_connection(self, conn: socket.socket): 416 """ 417 Will remove socket from the internal connections list 418 :param conn: target socket 419 :return: 420 """ 421 raise NotImplementedError()
42class LoopIsAlreadyBegun(Exception): 43 """ 44 You can not run NetIOUserApi.start() if it was already started (and still running). 45 """ 46 pass
You can not run NetIOUserApi.start() if it was already started (and still running).
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
49class WrongConnectionType(Exception): 50 """ 51 You cannot run NetIOUserApi.make_connection() for ConnectionType.active_accepted connection. This kind of 52 connections are made only from (and by) inside of IO loop and logic itself. 53 """ 54 pass
You cannot run NetIOUserApi.make_connection() for ConnectionType.active_accepted connection. This kind of connections are made only from (and by) inside of IO loop and logic itself.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
57class CanNotMakeConnection(Exception): 58 """ 59 Currently not used. If there will be some exception on connect() call - it will be raised without any changes. 60 """ 61 pass
Currently not used. If there will be some exception on connect() call - it will be raised without any changes.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
64class ConnectionType(enum.Enum): 65 passive = 0 # passive socket (bind()) 66 active_accepted = 1 # active accepted socket (accept()) 67 active_connected = 2 # active connected socket (connect())
An enumeration.
Inherited Members
- enum.Enum
- name
- value
70class ConnectionState(enum.Enum): 71 not_connected_yet = 0 # socket is not in connection process 72 waiting_for_connection = 1 # socket is in connection process (async connection is delayed) 73 connected = 2 # socket is successfully connected 74 worker_fault = 3 # there was unhandled exception from one of the WorkerBase callbacks 75 io_fault = 4 # there was some IO trouble 76 waiting_for_disconnection = 5 # connection was marked as "should be closed" but was not closed yet 77 disconnected = 6 # socket is closed
An enumeration.
Inherited Members
- enum.Enum
- name
- value
80class ConnectionInfo: 81 def __init__(self, 82 worker_obj, 83 connection_type: ConnectionType, 84 socket_address=None, 85 socket_family=socket.AF_INET, 86 socket_type=socket.SOCK_STREAM, 87 socket_protocol=0, 88 socket_fileno=None, 89 backlog=0): 90 """ 91 :param worker_obj: constructed worker object (see WorkerBase for more info). If this is a passive 92 connection - it (worker_obj) will be inherited by the descendant active_accepted connections 93 by copy.copy() call (see WorkerBase.__copy__() method for more info) 94 :param connection_type: see ConnectionType() description 95 :param socket_address: see socket.bind()/socket.connect() docs 96 :param socket_family: see socket.socket() docs 97 :param socket_type: see socket.socket() docs 98 :param socket_protocol: see socket.socket() docs 99 :param socket_fileno: see socket.socket() docs 100 :param backlog: see socket.listen() docs 101 """ 102 self.worker_obj = worker_obj 103 self.connection_type = connection_type 104 self.socket_address = socket_address 105 self.socket_family = socket_family 106 self.socket_type = socket_type 107 self.socket_protocol = socket_protocol 108 self.socket_fileno = socket_fileno 109 self.backlog = backlog
81 def __init__(self, 82 worker_obj, 83 connection_type: ConnectionType, 84 socket_address=None, 85 socket_family=socket.AF_INET, 86 socket_type=socket.SOCK_STREAM, 87 socket_protocol=0, 88 socket_fileno=None, 89 backlog=0): 90 """ 91 :param worker_obj: constructed worker object (see WorkerBase for more info). If this is a passive 92 connection - it (worker_obj) will be inherited by the descendant active_accepted connections 93 by copy.copy() call (see WorkerBase.__copy__() method for more info) 94 :param connection_type: see ConnectionType() description 95 :param socket_address: see socket.bind()/socket.connect() docs 96 :param socket_family: see socket.socket() docs 97 :param socket_type: see socket.socket() docs 98 :param socket_protocol: see socket.socket() docs 99 :param socket_fileno: see socket.socket() docs 100 :param backlog: see socket.listen() docs 101 """ 102 self.worker_obj = worker_obj 103 self.connection_type = connection_type 104 self.socket_address = socket_address 105 self.socket_family = socket_family 106 self.socket_type = socket_type 107 self.socket_protocol = socket_protocol 108 self.socket_fileno = socket_fileno 109 self.backlog = backlog
:param worker_obj: constructed worker object (see WorkerBase for more info). If this is a passive connection - it (worker_obj) will be inherited by the descendant active_accepted connections by copy.copy() call (see WorkerBase.__copy__() method for more info) :param connection_type: see ConnectionType() description :param socket_address: see socket.bind()/socket.connect() docs :param socket_family: see socket.socket() docs :param socket_type: see socket.socket() docs :param socket_protocol: see socket.socket() docs :param socket_fileno: see socket.socket() docs :param backlog: see socket.listen() docs
112class Connection: 113 """ 114 Connection class. Usually created by IO loop or by IO API. But you can also create it by yourself 115 """ 116 def __init__(self, 117 connection_id, 118 connection_info: ConnectionInfo, 119 connection_and_address_pair: tuple, 120 connection_state: ConnectionState, 121 connection_name=None, 122 ): 123 """ 124 :param connection_id: unique connection identificator (unique within the IO object). You may use some random 125 GUID if you are creating connection by your self. 126 :param connection_info: new connection will be created using information provided in connection_info object. 127 See ConnectionInfo() for more information 128 :param connection_and_address_pair: (conn, address) tuple where conn is connected socket (or it can be socket 129 is in the process of connection. But only if it was made by IO loop.). 130 :param connection_state: see ConnectionState for more information 131 :param connection_name: name of the connection (if it was provided) 132 """ 133 self.connection_id = connection_id 134 self.connection_info = connection_info 135 self.conn, self.address = connection_and_address_pair 136 self.connection_state = connection_state 137 self.connection_name = connection_name 138 self.worker_obj = connection_info.worker_obj 139 self.read_data = b'' # already read data 140 self.must_be_written_data = memoryview(b'') # this data should be written 141 self.force_write_call = False 142 143 def add_must_be_written_data(self, data): 144 """ 145 Use this method to add data to output buffers 146 :param data: some new output data to be send through this connection 147 :return: 148 """ 149 self.must_be_written_data = memoryview(bytes(self.must_be_written_data) + data)
Connection class. Usually created by IO loop or by IO API. But you can also create it by yourself
116 def __init__(self, 117 connection_id, 118 connection_info: ConnectionInfo, 119 connection_and_address_pair: tuple, 120 connection_state: ConnectionState, 121 connection_name=None, 122 ): 123 """ 124 :param connection_id: unique connection identificator (unique within the IO object). You may use some random 125 GUID if you are creating connection by your self. 126 :param connection_info: new connection will be created using information provided in connection_info object. 127 See ConnectionInfo() for more information 128 :param connection_and_address_pair: (conn, address) tuple where conn is connected socket (or it can be socket 129 is in the process of connection. But only if it was made by IO loop.). 130 :param connection_state: see ConnectionState for more information 131 :param connection_name: name of the connection (if it was provided) 132 """ 133 self.connection_id = connection_id 134 self.connection_info = connection_info 135 self.conn, self.address = connection_and_address_pair 136 self.connection_state = connection_state 137 self.connection_name = connection_name 138 self.worker_obj = connection_info.worker_obj 139 self.read_data = b'' # already read data 140 self.must_be_written_data = memoryview(b'') # this data should be written 141 self.force_write_call = False
:param connection_id: unique connection identificator (unique within the IO object). You may use some random GUID if you are creating connection by your self. :param connection_info: new connection will be created using information provided in connection_info object. See ConnectionInfo() for more information :param connection_and_address_pair: (conn, address) tuple where conn is connected socket (or it can be socket is in the process of connection. But only if it was made by IO loop.). :param connection_state: see ConnectionState for more information :param connection_name: name of the connection (if it was provided)
143 def add_must_be_written_data(self, data): 144 """ 145 Use this method to add data to output buffers 146 :param data: some new output data to be send through this connection 147 :return: 148 """ 149 self.must_be_written_data = memoryview(bytes(self.must_be_written_data) + data)
Use this method to add data to output buffers :param data: some new output data to be send through this connection :return:
152class NetIOUserApi: 153 """ 154 You may rely and use freely use methods of this base class from inside your program or from inside your worker 155 (WorkerBase). 156 """ 157 def __init__(self): 158 super().__init__() 159 self.all_connections = set() 160 self.passive_connections = set() 161 162 self.connection_by_id = dict() 163 self.connection_by_name = dict() 164 self.connection_by_fileno = dict() 165 166 def start(self, destroy_on_finish=True): 167 """ 168 Will start IO loop 169 :param destroy_on_finish: if True - destroy() will be called from inside of this method 170 :return: 171 """ 172 raise NotImplementedError() 173 174 def stop(self): 175 """ 176 Will initiate IO loop stop process 177 :return: 178 """ 179 raise NotImplementedError() 180 181 def make_connection(self, connection_info: ConnectionInfo = None, name=None)->Connection: 182 """ 183 Will create connection from given connection_info object. Than connection will be established. Immediate or 184 delayed - depends on the connection type: 185 - ConnectionType.passive - immediate; 186 - ConnectionType.active_connected - delayed. 187 In both cases WorkerBase.on_connect will be called immediately after connection will be successfully 188 established (IF it will be successfully established). 189 :param connection_info: new connection will be created using information provided in connection_info object. 190 See ConnectionInfo() for more information 191 :param name: name of the connection. If you'll provide it - you will be able to find this connection in 192 NetIOUserApi.connection_by_name dictionary by it's name. 193 :return: 194 """ 195 raise NotImplementedError() 196 197 def add_connection(self, connection: Connection): 198 """ 199 Will register already established connection. You need to use this method for example if you have already 200 connected socket 201 :param connection: 202 :return: 203 """ 204 raise NotImplementedError() 205 206 def remove_connection(self, connection: Connection): 207 """ 208 Will close and remove connection 209 :param connection: 210 :return: 211 """ 212 raise NotImplementedError() 213 214 def check_is_connection_need_to_sent_data(self, connection: Connection): 215 """ 216 Will check connection to output data presence. It is automatically called after EACH WorkerBase callback call 217 by the IO loop. But if you are filling other connection's output buffer - you'll need to make this call for that 218 connection by your self. 219 :param connection: 220 :return: 221 """ 222 raise NotImplementedError()
You may rely and use freely use methods of this base class from inside your program or from inside your worker (WorkerBase).
166 def start(self, destroy_on_finish=True): 167 """ 168 Will start IO loop 169 :param destroy_on_finish: if True - destroy() will be called from inside of this method 170 :return: 171 """ 172 raise NotImplementedError()
Will start IO loop :param destroy_on_finish: if True - destroy() will be called from inside of this method :return:
174 def stop(self): 175 """ 176 Will initiate IO loop stop process 177 :return: 178 """ 179 raise NotImplementedError()
Will initiate IO loop stop process :return:
181 def make_connection(self, connection_info: ConnectionInfo = None, name=None)->Connection: 182 """ 183 Will create connection from given connection_info object. Than connection will be established. Immediate or 184 delayed - depends on the connection type: 185 - ConnectionType.passive - immediate; 186 - ConnectionType.active_connected - delayed. 187 In both cases WorkerBase.on_connect will be called immediately after connection will be successfully 188 established (IF it will be successfully established). 189 :param connection_info: new connection will be created using information provided in connection_info object. 190 See ConnectionInfo() for more information 191 :param name: name of the connection. If you'll provide it - you will be able to find this connection in 192 NetIOUserApi.connection_by_name dictionary by it's name. 193 :return: 194 """ 195 raise NotImplementedError()
Will create connection from given connection_info object. Than connection will be established. Immediate or delayed - depends on the connection type:
- ConnectionType.passive - immediate;
- ConnectionType.active_connected - delayed. In both cases WorkerBase.on_connect will be called immediately after connection will be successfully established (IF it will be successfully established). :param connection_info: new connection will be created using information provided in connection_info object. See ConnectionInfo() for more information :param name: name of the connection. If you'll provide it - you will be able to find this connection in NetIOUserApi.connection_by_name dictionary by it's name. :return:
197 def add_connection(self, connection: Connection): 198 """ 199 Will register already established connection. You need to use this method for example if you have already 200 connected socket 201 :param connection: 202 :return: 203 """ 204 raise NotImplementedError()
Will register already established connection. You need to use this method for example if you have already connected socket :param connection: :return:
206 def remove_connection(self, connection: Connection): 207 """ 208 Will close and remove connection 209 :param connection: 210 :return: 211 """ 212 raise NotImplementedError()
Will close and remove connection :param connection: :return:
214 def check_is_connection_need_to_sent_data(self, connection: Connection): 215 """ 216 Will check connection to output data presence. It is automatically called after EACH WorkerBase callback call 217 by the IO loop. But if you are filling other connection's output buffer - you'll need to make this call for that 218 connection by your self. 219 :param connection: 220 :return: 221 """ 222 raise NotImplementedError()
Will check connection to output data presence. It is automatically called after EACH WorkerBase callback call by the IO loop. But if you are filling other connection's output buffer - you'll need to make this call for that connection by your self. :param connection: :return:
225class NetIOCallbacks: 226 """ 227 Callbacks from this class will be called from inside (and by) IOMethodBase loop. 228 """ 229 def __init__(self): 230 super().__init__() 231 232 def on_accept_connection(self, connection: Connection): 233 raise NotImplementedError() 234 235 def on_connected(self, connection: Connection): 236 raise NotImplementedError() 237 238 def on_read(self, connection: Connection): 239 raise NotImplementedError() 240 241 def on_write(self, connection: Connection): 242 raise NotImplementedError() 243 244 def on_close(self, connection: Connection): 245 raise NotImplementedError()
Callbacks from this class will be called from inside (and by) IOMethodBase loop.
248class NetIOBase(NetIOUserApi, NetIOCallbacks): 249 """ 250 Base class for any IO implementation (Linux, BSD, Windows, cross platform, etc.). 251 """ 252 def __init__(self, transport): 253 """ 254 255 :param transport: class of the desired IOMethod. Instance (object) will be created by NetIOBase itself 256 """ 257 super().__init__() 258 self.method = transport(self) 259 260 def destroy(self): 261 raise NotImplementedError()
Base class for any IO implementation (Linux, BSD, Windows, cross platform, etc.).
252 def __init__(self, transport): 253 """ 254 255 :param transport: class of the desired IOMethod. Instance (object) will be created by NetIOBase itself 256 """ 257 super().__init__() 258 self.method = transport(self)
:param transport: class of the desired IOMethod. Instance (object) will be created by NetIOBase itself
264class WorkerBase: 265 """ 266 Base class for all workers. 267 on_* callbacks will be called by the IO loop. 268 269 General info: 270 You can read input data from self.connection at any time (see "Caution" section of __init__ doc string) from any 271 callback. 272 You can write output data (to be send) to self.connection at any time (see "Caution") from any callback. 273 """ 274 def __init__(self, api: NetIOUserApi=None, connection: Connection=None): 275 """ 276 Caution: 277 Please do not rely on self.api and self.connection inside of your __init__ constructor: it is guaranteed that 278 they will be set before any callback call, but not at the construction process. 279 280 :param api: link to the constructed network io object 281 :param connection: link to the constructed connection object 282 """ 283 self.api = api 284 self.connection = connection 285 286 def on_connect(self): 287 """ 288 Will be called after connection successfully established 289 :return: 290 """ 291 pass 292 293 def on_read(self): 294 """ 295 Will be called if there is some NEW data in the connection input buffer 296 :return: 297 """ 298 pass 299 300 def on_no_more_data_to_write(self): 301 """ 302 Will be called after all data is sent. 303 Normally it will be called once (one single shot after each portion of out data is sent). 304 If you'll set self.connection.force_write_call to True state - this callback may be called continuously 305 (but not guaranteed: it depends of used IOMethod implementation) 306 :return: 307 """ 308 pass 309 310 def on_connection_lost(self): 311 """ 312 Will be called AFTER connection socket was actually closed and removed from IOMethod checking list. 313 At this time, self.connection.connection_state is already set to ConnectionState.disconnected. 314 :return: 315 """ 316 pass 317 318 def __copy__(self): 319 """ 320 This method SHOULD be implemented. It should create a new instance and copy some global (shared) data from 321 current object to that new instance. It will be called when new peer is connected to existing passive connection 322 So this is the way you may use to give all new connection some links to some global data by worker object of 323 the passive connection replication process. 324 :return: 325 """ 326 raise NotImplementedError()
Base class for all workers. on_* callbacks will be called by the IO loop.
General info: You can read input data from self.connection at any time (see "Caution" section of __init__ doc string) from any callback. You can write output data (to be send) to self.connection at any time (see "Caution") from any callback.
274 def __init__(self, api: NetIOUserApi=None, connection: Connection=None): 275 """ 276 Caution: 277 Please do not rely on self.api and self.connection inside of your __init__ constructor: it is guaranteed that 278 they will be set before any callback call, but not at the construction process. 279 280 :param api: link to the constructed network io object 281 :param connection: link to the constructed connection object 282 """ 283 self.api = api 284 self.connection = connection
Caution: Please do not rely on self.api and self.connection inside of your __init__ constructor: it is guaranteed that they will be set before any callback call, but not at the construction process.
:param api: link to the constructed network io object :param connection: link to the constructed connection object
286 def on_connect(self): 287 """ 288 Will be called after connection successfully established 289 :return: 290 """ 291 pass
Will be called after connection successfully established :return:
293 def on_read(self): 294 """ 295 Will be called if there is some NEW data in the connection input buffer 296 :return: 297 """ 298 pass
Will be called if there is some NEW data in the connection input buffer :return:
300 def on_no_more_data_to_write(self): 301 """ 302 Will be called after all data is sent. 303 Normally it will be called once (one single shot after each portion of out data is sent). 304 If you'll set self.connection.force_write_call to True state - this callback may be called continuously 305 (but not guaranteed: it depends of used IOMethod implementation) 306 :return: 307 """ 308 pass
Will be called after all data is sent. Normally it will be called once (one single shot after each portion of out data is sent). If you'll set self.connection.force_write_call to True state - this callback may be called continuously (but not guaranteed: it depends of used IOMethod implementation) :return:
310 def on_connection_lost(self): 311 """ 312 Will be called AFTER connection socket was actually closed and removed from IOMethod checking list. 313 At this time, self.connection.connection_state is already set to ConnectionState.disconnected. 314 :return: 315 """ 316 pass
Will be called AFTER connection socket was actually closed and removed from IOMethod checking list. At this time, self.connection.connection_state is already set to ConnectionState.disconnected. :return:
329@contextmanager 330def net_io(net_io_obj: NetIOBase): 331 """ 332 Context manager. 333 Usage: 334 335 main_io = NetIOBase() 336 with(net_io(main_io)) as io: 337 print('Preparing connections') 338 connection1 = io.make_connection(...) 339 connection2 = io.make_connection(...) 340 k = c + 12 341 ... 342 connectionN = io.make_connection(...) 343 print('Starting IO loop') 344 print('IO loop was finished properly') 345 346 347 :param net_io_obj: constructed IO instance (object) 348 :return: 349 """ 350 try: 351 yield net_io_obj 352 net_io_obj.start(destroy_on_finish=False) 353 finally: 354 net_io_obj.destroy()
Context manager. Usage:
main_io = NetIOBase() with(net_io(main_io)) as io: print('Preparing connections') connection1 = io.make_connection(...) connection2 = io.make_connection(...) k = c + 12 ... connectionN = io.make_connection(...) print('Starting IO loop') print('IO loop was finished properly')
:param net_io_obj: constructed IO instance (object) :return:
357class IOMethodBase: 358 """ 359 Base class for all IOMethod implementation (select, epoll, overlapped io, kqueue, etc.) 360 All his methods are called by the NetIOBase instance. 361 """ 362 def __init__(self, interface: NetIOBase): 363 self.interface = interface 364 self.should_be_closed = set() 365 pass 366 367 def loop_iteration(self, timeout=None): 368 """ 369 Single IO loop iteration. 370 This method holds all IOMethod logic. 371 :return: 372 """ 373 raise NotImplementedError() 374 375 def destroy(self): 376 """ 377 Initiates destruction process 378 :return: 379 """ 380 raise NotImplementedError() 381 382 def set__can_read(self, conn: socket.socket, state=True): 383 """ 384 Will allow (True) or disallow (False) "socket available to read" checks for socket 385 :param conn: target socket 386 :param state: True - allow; False - disallow 387 :return: 388 """ 389 raise NotImplementedError() 390 391 def set__need_write(self, conn: socket.socket, state=True): 392 """ 393 Will allow (True) or disallow (False) "socket available to write" checks for socket 394 :param conn: target socket 395 :param state: True - allow; False - disallow 396 :return: 397 """ 398 raise NotImplementedError() 399 400 def set__should_be_closed(self, conn: socket.socket): 401 """ 402 Mark socket as "should be closed" 403 :param conn: target socket 404 :return: 405 """ 406 raise NotImplementedError() 407 408 def add_connection(self, conn: socket.socket): 409 """ 410 Will add socket to the internal connections list 411 :param conn: target socket 412 :return: 413 """ 414 raise NotImplementedError() 415 416 def remove_connection(self, conn: socket.socket): 417 """ 418 Will remove socket from the internal connections list 419 :param conn: target socket 420 :return: 421 """ 422 raise NotImplementedError()
Base class for all IOMethod implementation (select, epoll, overlapped io, kqueue, etc.) All his methods are called by the NetIOBase instance.
367 def loop_iteration(self, timeout=None): 368 """ 369 Single IO loop iteration. 370 This method holds all IOMethod logic. 371 :return: 372 """ 373 raise NotImplementedError()
Single IO loop iteration. This method holds all IOMethod logic. :return:
375 def destroy(self): 376 """ 377 Initiates destruction process 378 :return: 379 """ 380 raise NotImplementedError()
Initiates destruction process :return:
382 def set__can_read(self, conn: socket.socket, state=True): 383 """ 384 Will allow (True) or disallow (False) "socket available to read" checks for socket 385 :param conn: target socket 386 :param state: True - allow; False - disallow 387 :return: 388 """ 389 raise NotImplementedError()
Will allow (True) or disallow (False) "socket available to read" checks for socket :param conn: target socket :param state: True - allow; False - disallow :return:
391 def set__need_write(self, conn: socket.socket, state=True): 392 """ 393 Will allow (True) or disallow (False) "socket available to write" checks for socket 394 :param conn: target socket 395 :param state: True - allow; False - disallow 396 :return: 397 """ 398 raise NotImplementedError()
Will allow (True) or disallow (False) "socket available to write" checks for socket :param conn: target socket :param state: True - allow; False - disallow :return:
400 def set__should_be_closed(self, conn: socket.socket): 401 """ 402 Mark socket as "should be closed" 403 :param conn: target socket 404 :return: 405 """ 406 raise NotImplementedError()
Mark socket as "should be closed" :param conn: target socket :return:
408 def add_connection(self, conn: socket.socket): 409 """ 410 Will add socket to the internal connections list 411 :param conn: target socket 412 :return: 413 """ 414 raise NotImplementedError()
Will add socket to the internal connections list :param conn: target socket :return:
416 def remove_connection(self, conn: socket.socket): 417 """ 418 Will remove socket from the internal connections list 419 :param conn: target socket 420 :return: 421 """ 422 raise NotImplementedError()
Will remove socket from the internal connections list :param conn: target socket :return: