cengal.io.asock_io.versions.v_1.abstract
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18import socket 19import errno 20import copy 21import enum 22from contextlib import contextmanager 23from collections import deque 24from typing import Set, Iterable 25 26""" 27Module Docstring 28Docstrings: http://www.python.org/dev/peps/pep-0257/ 29""" 30 31__author__ = "ButenkoMS <gtalk@butenkoms.space>" 32__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 33__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 34__license__ = "Apache License, Version 2.0" 35__version__ = "4.4.1" 36__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 37__email__ = "gtalk@butenkoms.space" 38# __status__ = "Prototype" 39__status__ = "Development" 40# __status__ = "Production" 41 42 43class LoopIsAlreadyBegun(Exception): 44 """ 45 You can not run NetIOUserApi.start() if it was already started (and still running). 46 """ 47 pass 48 49 50class WrongConnectionType(Exception): 51 """ 52 You cannot run NetIOUserApi.make_connection() for ConnectionType.active_accepted connection. This kind of 53 connections are made only from (and by) inside of IO loop and logic itself. 54 """ 55 pass 56 57 58class CanNotMakeConnection(Exception): 59 """ 60 Currently not used. If there will be some exception on connect() call - it will be raised without any changes. 61 """ 62 pass 63 64 65# class ConnectionType(enum.Enum): 66# passive = 0 # passive socket (bind()) 67# active_accepted = 1 # active accepted socket (accept()) 68# active_connected = 2 # active connected socket (connect()) 69# 70# 71# class ConnectionState(enum.Enum): 72# not_connected_yet = 0 # socket is not in connection process 73# waiting_for_connection = 1 # socket is in connection process (async connection is delayed) 74# connected = 2 # socket is successfully connected 75# worker_fault = 3 # there was unhandled exception from one of the WorkerBase callbacks 76# io_fault = 4 # there was some IO trouble 77# waiting_for_disconnection = 5 # connection was marked as "should be closed" but was not closed yet 78# disconnected = 6 # socket is closed 79 80 81class NonSocketConnectionSettings: 82 pass 83 84 85# class ConnectionDirectionRole: 86# server = 0 87# client = 1 88 89 90class ConnectionType: 91 passive = 0 # passive socket (bind()) 92 active_accepted = 1 # active accepted socket (accept()) 93 active_connected = 2 # active connected socket (connect()) 94 95 96class ConnectionState: 97 not_connected_yet = 0 # socket is not in connection process 98 waiting_for_connection = 1 # socket is in connection process (async connection is delayed) 99 connected = 2 # socket is successfully connected 100 worker_fault = 3 # there was unhandled exception from one of the WorkerBase callbacks 101 io_fault = 4 # there was some IO trouble 102 waiting_for_disconnection = 5 # connection was marked as "should be closed" but was not closed yet 103 disconnected = 6 # socket is closed 104 105 106class ConnectionInfo: 107 def __init__(self, 108 worker_obj, 109 connection_type: ConnectionType, 110 socket_address=None, 111 socket_family=socket.AF_INET, 112 socket_type=socket.SOCK_STREAM, 113 socket_protocol=0, 114 socket_fileno=None, 115 backlog=0): 116 """ 117 :param worker_obj: constructed worker object (see WorkerBase for more info). If this is a passive 118 connection - it (worker_obj) will be inherited by the descendant active_accepted connections 119 by copy.copy() call (see WorkerBase.__copy__() method for more info) 120 :param connection_type: see ConnectionType() description 121 :param socket_address: see socket.bind()/socket.connect() docs 122 :param socket_family: see socket.socket() docs 123 :param socket_type: see socket.socket() docs 124 :param socket_protocol: see socket.socket() docs 125 :param socket_fileno: see socket.socket() docs 126 :param backlog: see socket.listen() docs 127 """ 128 self.worker_obj = worker_obj 129 self.connection_type = connection_type 130 self.socket_address = socket_address 131 self.socket_family = socket_family 132 self.socket_type = socket_type 133 self.socket_protocol = socket_protocol 134 self.socket_fileno = socket_fileno 135 self.backlog = backlog 136 137 138class Connection: 139 """ 140 Connection class. Usually created by IO loop or by IO API. But you can also create it by yourself 141 """ 142 def __init__(self, 143 connection_id, 144 connection_info: ConnectionInfo, 145 connection_and_address_pair: tuple, 146 connection_state: ConnectionState, 147 connection_name=None, 148 ): 149 """ 150 :param connection_id: unique connection identificator (unique within the IO object). You may use some random 151 GUID if you are creating connection by your self. 152 :param connection_info: new connection will be created using information provided in connection_info object. 153 See ConnectionInfo() for more information 154 :param connection_and_address_pair: (conn, address) tuple where conn is connected socket (or it can be socket 155 is in the process of connection. But only if it was made by IO loop.). 156 :param connection_state: see ConnectionState for more information 157 :param connection_name: name of the connection (if it was provided) 158 """ 159 self.connection_id = connection_id 160 self.connection_info = connection_info 161 self.conn, self.address = connection_and_address_pair 162 self.connection_state = connection_state 163 self.connection_name = connection_name 164 self.worker_obj = connection_info.worker_obj 165 self.read_data = b'' # already read data 166 self.must_be_written_data = memoryview(b'') # this data should be written 167 self.force_write_call = False 168 169 def add_must_be_written_data(self, data): 170 """ 171 Use this method to add data to output buffers 172 :param data: some new output data to be send through this connection 173 :return: 174 """ 175 self.must_be_written_data = memoryview(bytes(self.must_be_written_data) + data) 176 177 178class NetIOUserApi: 179 """ 180 You may rely and use freely use methods of this base class from inside your program or from inside your worker 181 (WorkerBase). 182 """ 183 def __init__(self): 184 super().__init__() 185 self.all_connections = set() 186 self.passive_connections = set() 187 188 self.connection_by_id = dict() 189 self.connection_by_name = dict() 190 self.connection_by_fileno = dict() 191 192 def start(self, destroy_on_finish=True): 193 """ 194 Will start IO loop 195 :param destroy_on_finish: if True - destroy() will be called from inside of this method 196 :return: 197 """ 198 raise NotImplementedError() 199 200 def stop(self): 201 """ 202 Will initiate IO loop stop process 203 :return: 204 """ 205 raise NotImplementedError() 206 207 def make_connection(self, connection_info: ConnectionInfo = None, name=None)->Connection: 208 """ 209 Will create connection from given connection_info object. Than connection will be established. Immediate or 210 delayed - depends on the connection type: 211 - ConnectionType.passive - immediate; 212 - ConnectionType.active_connected - delayed. 213 In both cases WorkerBase.on_connect will be called immediately after connection will be successfully 214 established (IF it will be successfully established). 215 :param connection_info: new connection will be created using information provided in connection_info object. 216 See ConnectionInfo() for more information 217 :param name: name of the connection. If you'll provide it - you will be able to find this connection in 218 NetIOUserApi.connection_by_name dictionary by it's name. 219 :return: 220 """ 221 raise NotImplementedError() 222 223 def add_connection(self, connection: Connection): 224 """ 225 Will register already established connection. You need to use this method for example if you have already 226 connected socket 227 :param connection: 228 :return: 229 """ 230 raise NotImplementedError() 231 232 def remove_connection(self, connection: Connection): 233 """ 234 Will close and remove connection 235 :param connection: 236 :return: 237 """ 238 raise NotImplementedError() 239 240 def check_is_connection_need_to_sent_data(self, connection: Connection): 241 """ 242 Will check connection to output data presence. It is automatically called after EACH WorkerBase callback call 243 by the IO loop. But if you are filling other connection's output buffer - you'll need to make this call for that 244 connection by your self. 245 :param connection: 246 :return: 247 """ 248 raise NotImplementedError() 249 250 251class NetIOCallbacks: 252 """ 253 Callbacks from this class will be called from inside (and by) IOMethodBase loop. 254 """ 255 def __init__(self): 256 super().__init__() 257 258 def on_accept_connection(self, connection: Connection): 259 raise NotImplementedError() 260 261 def on_connected(self, connection: Connection): 262 raise NotImplementedError() 263 264 def on_read(self, connection: Connection): 265 raise NotImplementedError() 266 267 def on_write(self, connection: Connection): 268 raise NotImplementedError() 269 270 def on_close(self, connection: Connection): 271 raise NotImplementedError() 272 273 274class NetIOBase(NetIOUserApi, NetIOCallbacks): 275 """ 276 Base class for any IO implementation (Linux, BSD, Windows, cross platform, etc.). 277 """ 278 def __init__(self, transport): 279 """ 280 281 :param transport: class of the desired IOMethod. Instance (object) will be created by NetIOBase itself 282 """ 283 super().__init__() 284 self.method = transport(self) 285 286 def destroy(self): 287 raise NotImplementedError() 288 289 290class WorkerBase: 291 """ 292 Base class for all workers. 293 on_* callbacks will be called by the IO loop. 294 295 General info: 296 You can read input data from self.connection at any time (see "Caution" section of __init__ doc string) from any 297 callback. 298 You can write output data (to be send) to self.connection at any time (see "Caution") from any callback. 299 """ 300 def __init__(self, api: NetIOUserApi=None, connection: Connection=None): 301 """ 302 Caution: 303 Please do not rely on self.api and self.connection inside of your __init__ constructor: it is guaranteed that 304 they will be set before any callback call, but not at the construction process. 305 306 :param api: link to the constructed network io object 307 :param connection: link to the constructed connection object 308 """ 309 self.api = api 310 self.connection = connection 311 312 def on_connect(self): 313 """ 314 Will be called after connection successfully established 315 :return: 316 """ 317 pass 318 319 def on_read(self): 320 """ 321 Will be called if there is some NEW data in the connection input buffer 322 :return: 323 """ 324 pass 325 326 def on_no_more_data_to_write(self): 327 """ 328 Will be called after all data is sent. 329 Normally it will be called once (one single shot after each portion of out data is sent). 330 If you'll set self.connection.force_write_call to True state - this callback may be called continuously 331 (but not guaranteed: it depends of used IOMethod implementation) 332 :return: 333 """ 334 pass 335 336 def on_connection_lost(self): 337 """ 338 Will be called AFTER connection socket was actually closed and removed from IOMethod checking list. 339 At this time, self.connection.connection_state is already set to ConnectionState.disconnected. 340 :return: 341 """ 342 pass 343 344 def __copy__(self): 345 """ 346 This method SHOULD be implemented. It should create a new instance and copy some global (shared) data from 347 current object to that new instance. It will be called when new peer is connected to existing passive connection 348 So this is the way you may use to give all new connection some links to some global data by worker object of 349 the passive connection replication process. 350 :return: 351 """ 352 raise NotImplementedError() 353 354 355class InlineWorkerBase: 356 __slots__ = ('client_id', 'keyword', 'socket_family', 'socket_type', 'socket_proto', 'addr_info', 'host_names', 357 'is_in_raw_mode', '__set__is_in_raw_mode', '__set__mark_socket_as_should_be_closed_immediately', 358 '__set__mark_socket_as_ready_to_be_closed', '__external_parameters_set_trigger', 'output_messages', 359 '__hold__client_id') 360 361 def __init__(self, client_id=None, keyword: bytes = None, socket_family=None, socket_type=None, socket_proto=None, 362 addr_info=None, host_names=None, external_parameters_set_trigger: Set = None): 363 """ 364 365 :param keyword: client keyword. You may check for a known keywords to act appropriately 366 :param socket_family: 367 :param socket_type: 368 :param socket_proto: 369 :param addr_info: result of socket.getaddrinfo() call 370 :param host_names: result of socket.gethostbyaddr() call 371 """ 372 self.client_id = client_id 373 self.keyword = keyword 374 self.socket_family = socket_family 375 self.socket_type = socket_type 376 self.socket_proto = socket_proto 377 self.addr_info = addr_info 378 self.host_names = host_names 379 self.is_in_raw_mode = None 380 381 # self.output_messages = FIFODequeWithLengthControl() 382 self.output_messages = deque() 383 # self.output_messages = list() 384 385 def on__data_received(self, data: bytes): 386 """ 387 Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data 388 Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will 389 be logged 390 :param data: piece of input data if connection is in RAW-mode and full message otherwise. 391 """ 392 pass 393 394 def on__output_buffers_are_empty(self): 395 """ 396 Will be called immediately when all output data was send. 397 Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data 398 Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will 399 be logged 400 """ 401 pass 402 403 def on__connection_lost(self): 404 """ 405 Will be called after connection was closed. Current Inline Processor object will be destroyed after this call. 406 Situation with unhandled exception will be logged. 407 """ 408 pass 409 410 def set__is_in_raw_mode(self, is_in_raw_mode: bool): 411 raise NotImplementedError() 412 413 def mark__socket_as_should_be_closed_immediately(self, mark_socket_as: bool): 414 raise NotImplementedError() 415 416 def mark__socket_as_ready_to_be_closed(self, mark_socket_as: bool): 417 raise NotImplementedError() 418 419 def __getstate__(self): 420 raise NotImplementedError() 421 422 def __setstate__(self, state): 423 raise NotImplementedError() 424 425 426@contextmanager 427def net_io(net_io_obj: NetIOBase): 428 """ 429 Context manager. 430 Usage: 431 432 main_io = NetIOBase() 433 with(net_io(main_io)) as io: 434 print('Preparing connections') 435 connection1 = io.make_connection(...) 436 connection2 = io.make_connection(...) 437 k = c + 12 438 ... 439 connectionN = io.make_connection(...) 440 print('Starting IO loop') 441 print('IO loop was finished properly') 442 443 444 :param net_io_obj: constructed IO instance (object) 445 :return: 446 """ 447 try: 448 yield net_io_obj 449 net_io_obj.start(destroy_on_finish=False) 450 finally: 451 net_io_obj.destroy() 452 453 454class IOLoopBase: 455 """ 456 Base class for all IOMethod implementation (select, epoll, overlapped io, kqueue, etc.) 457 All his methods are called by the NetIOBase instance. 458 """ 459 def __init__(self, interface: NetIOBase): 460 self.interface = interface 461 self.should_be_closed = set() 462 pass 463 464 def loop_iteration(self, timeout=-1): 465 """ 466 Single IO loop iteration. 467 This method holds all IOMethod logic. 468 :param timeout: float or int. If timeout is negative, the call will block until there is an event. 469 :return: 470 """ 471 raise NotImplementedError() 472 473 def destroy(self): 474 """ 475 Initiates destruction process 476 :return: 477 """ 478 raise NotImplementedError() 479 480 def set__can_read(self, conn: socket.socket, state=True): 481 """ 482 Will allow (True) or disallow (False) "socket available to read" checks for socket 483 :param conn: target socket 484 :param state: True - allow; False - disallow 485 :return: 486 """ 487 raise NotImplementedError() 488 489 def set__need_write(self, conn: socket.socket, state=True): 490 """ 491 Will allow (True) or disallow (False) "socket available to write" checks for socket 492 :param conn: target socket 493 :param state: True - allow; False - disallow 494 :return: 495 """ 496 raise NotImplementedError() 497 498 def set__should_be_closed(self, conn: socket.socket): 499 """ 500 Mark socket as "should be closed" 501 :param conn: target socket 502 :return: 503 """ 504 raise NotImplementedError() 505 506 def add_connection(self, conn: socket.socket): 507 """ 508 Will add socket to the internal connections list 509 :param conn: target socket 510 :return: 511 """ 512 raise NotImplementedError() 513 514 def remove_connection(self, conn: socket.socket): 515 """ 516 Will remove socket from the internal connections list 517 :param conn: target socket 518 :return: 519 """ 520 raise NotImplementedError()
44class LoopIsAlreadyBegun(Exception): 45 """ 46 You can not run NetIOUserApi.start() if it was already started (and still running). 47 """ 48 pass
You can not run NetIOUserApi.start() if it was already started (and still running).
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
51class WrongConnectionType(Exception): 52 """ 53 You cannot run NetIOUserApi.make_connection() for ConnectionType.active_accepted connection. This kind of 54 connections are made only from (and by) inside of IO loop and logic itself. 55 """ 56 pass
You cannot run NetIOUserApi.make_connection() for ConnectionType.active_accepted connection. This kind of connections are made only from (and by) inside of IO loop and logic itself.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
59class CanNotMakeConnection(Exception): 60 """ 61 Currently not used. If there will be some exception on connect() call - it will be raised without any changes. 62 """ 63 pass
Currently not used. If there will be some exception on connect() call - it will be raised without any changes.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
97class ConnectionState: 98 not_connected_yet = 0 # socket is not in connection process 99 waiting_for_connection = 1 # socket is in connection process (async connection is delayed) 100 connected = 2 # socket is successfully connected 101 worker_fault = 3 # there was unhandled exception from one of the WorkerBase callbacks 102 io_fault = 4 # there was some IO trouble 103 waiting_for_disconnection = 5 # connection was marked as "should be closed" but was not closed yet 104 disconnected = 6 # socket is closed
107class ConnectionInfo: 108 def __init__(self, 109 worker_obj, 110 connection_type: ConnectionType, 111 socket_address=None, 112 socket_family=socket.AF_INET, 113 socket_type=socket.SOCK_STREAM, 114 socket_protocol=0, 115 socket_fileno=None, 116 backlog=0): 117 """ 118 :param worker_obj: constructed worker object (see WorkerBase for more info). If this is a passive 119 connection - it (worker_obj) will be inherited by the descendant active_accepted connections 120 by copy.copy() call (see WorkerBase.__copy__() method for more info) 121 :param connection_type: see ConnectionType() description 122 :param socket_address: see socket.bind()/socket.connect() docs 123 :param socket_family: see socket.socket() docs 124 :param socket_type: see socket.socket() docs 125 :param socket_protocol: see socket.socket() docs 126 :param socket_fileno: see socket.socket() docs 127 :param backlog: see socket.listen() docs 128 """ 129 self.worker_obj = worker_obj 130 self.connection_type = connection_type 131 self.socket_address = socket_address 132 self.socket_family = socket_family 133 self.socket_type = socket_type 134 self.socket_protocol = socket_protocol 135 self.socket_fileno = socket_fileno 136 self.backlog = backlog
108 def __init__(self, 109 worker_obj, 110 connection_type: ConnectionType, 111 socket_address=None, 112 socket_family=socket.AF_INET, 113 socket_type=socket.SOCK_STREAM, 114 socket_protocol=0, 115 socket_fileno=None, 116 backlog=0): 117 """ 118 :param worker_obj: constructed worker object (see WorkerBase for more info). If this is a passive 119 connection - it (worker_obj) will be inherited by the descendant active_accepted connections 120 by copy.copy() call (see WorkerBase.__copy__() method for more info) 121 :param connection_type: see ConnectionType() description 122 :param socket_address: see socket.bind()/socket.connect() docs 123 :param socket_family: see socket.socket() docs 124 :param socket_type: see socket.socket() docs 125 :param socket_protocol: see socket.socket() docs 126 :param socket_fileno: see socket.socket() docs 127 :param backlog: see socket.listen() docs 128 """ 129 self.worker_obj = worker_obj 130 self.connection_type = connection_type 131 self.socket_address = socket_address 132 self.socket_family = socket_family 133 self.socket_type = socket_type 134 self.socket_protocol = socket_protocol 135 self.socket_fileno = socket_fileno 136 self.backlog = backlog
:param worker_obj: constructed worker object (see WorkerBase for more info). If this is a passive connection - it (worker_obj) will be inherited by the descendant active_accepted connections by copy.copy() call (see WorkerBase.__copy__() method for more info) :param connection_type: see ConnectionType() description :param socket_address: see socket.bind()/socket.connect() docs :param socket_family: see socket.socket() docs :param socket_type: see socket.socket() docs :param socket_protocol: see socket.socket() docs :param socket_fileno: see socket.socket() docs :param backlog: see socket.listen() docs
139class Connection: 140 """ 141 Connection class. Usually created by IO loop or by IO API. But you can also create it by yourself 142 """ 143 def __init__(self, 144 connection_id, 145 connection_info: ConnectionInfo, 146 connection_and_address_pair: tuple, 147 connection_state: ConnectionState, 148 connection_name=None, 149 ): 150 """ 151 :param connection_id: unique connection identificator (unique within the IO object). You may use some random 152 GUID if you are creating connection by your self. 153 :param connection_info: new connection will be created using information provided in connection_info object. 154 See ConnectionInfo() for more information 155 :param connection_and_address_pair: (conn, address) tuple where conn is connected socket (or it can be socket 156 is in the process of connection. But only if it was made by IO loop.). 157 :param connection_state: see ConnectionState for more information 158 :param connection_name: name of the connection (if it was provided) 159 """ 160 self.connection_id = connection_id 161 self.connection_info = connection_info 162 self.conn, self.address = connection_and_address_pair 163 self.connection_state = connection_state 164 self.connection_name = connection_name 165 self.worker_obj = connection_info.worker_obj 166 self.read_data = b'' # already read data 167 self.must_be_written_data = memoryview(b'') # this data should be written 168 self.force_write_call = False 169 170 def add_must_be_written_data(self, data): 171 """ 172 Use this method to add data to output buffers 173 :param data: some new output data to be send through this connection 174 :return: 175 """ 176 self.must_be_written_data = memoryview(bytes(self.must_be_written_data) + data)
Connection class. Usually created by IO loop or by IO API. But you can also create it by yourself
143 def __init__(self, 144 connection_id, 145 connection_info: ConnectionInfo, 146 connection_and_address_pair: tuple, 147 connection_state: ConnectionState, 148 connection_name=None, 149 ): 150 """ 151 :param connection_id: unique connection identificator (unique within the IO object). You may use some random 152 GUID if you are creating connection by your self. 153 :param connection_info: new connection will be created using information provided in connection_info object. 154 See ConnectionInfo() for more information 155 :param connection_and_address_pair: (conn, address) tuple where conn is connected socket (or it can be socket 156 is in the process of connection. But only if it was made by IO loop.). 157 :param connection_state: see ConnectionState for more information 158 :param connection_name: name of the connection (if it was provided) 159 """ 160 self.connection_id = connection_id 161 self.connection_info = connection_info 162 self.conn, self.address = connection_and_address_pair 163 self.connection_state = connection_state 164 self.connection_name = connection_name 165 self.worker_obj = connection_info.worker_obj 166 self.read_data = b'' # already read data 167 self.must_be_written_data = memoryview(b'') # this data should be written 168 self.force_write_call = False
:param connection_id: unique connection identificator (unique within the IO object). You may use some random GUID if you are creating connection by your self. :param connection_info: new connection will be created using information provided in connection_info object. See ConnectionInfo() for more information :param connection_and_address_pair: (conn, address) tuple where conn is connected socket (or it can be socket is in the process of connection. But only if it was made by IO loop.). :param connection_state: see ConnectionState for more information :param connection_name: name of the connection (if it was provided)
170 def add_must_be_written_data(self, data): 171 """ 172 Use this method to add data to output buffers 173 :param data: some new output data to be send through this connection 174 :return: 175 """ 176 self.must_be_written_data = memoryview(bytes(self.must_be_written_data) + data)
Use this method to add data to output buffers :param data: some new output data to be send through this connection :return:
179class NetIOUserApi: 180 """ 181 You may rely and use freely use methods of this base class from inside your program or from inside your worker 182 (WorkerBase). 183 """ 184 def __init__(self): 185 super().__init__() 186 self.all_connections = set() 187 self.passive_connections = set() 188 189 self.connection_by_id = dict() 190 self.connection_by_name = dict() 191 self.connection_by_fileno = dict() 192 193 def start(self, destroy_on_finish=True): 194 """ 195 Will start IO loop 196 :param destroy_on_finish: if True - destroy() will be called from inside of this method 197 :return: 198 """ 199 raise NotImplementedError() 200 201 def stop(self): 202 """ 203 Will initiate IO loop stop process 204 :return: 205 """ 206 raise NotImplementedError() 207 208 def make_connection(self, connection_info: ConnectionInfo = None, name=None)->Connection: 209 """ 210 Will create connection from given connection_info object. Than connection will be established. Immediate or 211 delayed - depends on the connection type: 212 - ConnectionType.passive - immediate; 213 - ConnectionType.active_connected - delayed. 214 In both cases WorkerBase.on_connect will be called immediately after connection will be successfully 215 established (IF it will be successfully established). 216 :param connection_info: new connection will be created using information provided in connection_info object. 217 See ConnectionInfo() for more information 218 :param name: name of the connection. If you'll provide it - you will be able to find this connection in 219 NetIOUserApi.connection_by_name dictionary by it's name. 220 :return: 221 """ 222 raise NotImplementedError() 223 224 def add_connection(self, connection: Connection): 225 """ 226 Will register already established connection. You need to use this method for example if you have already 227 connected socket 228 :param connection: 229 :return: 230 """ 231 raise NotImplementedError() 232 233 def remove_connection(self, connection: Connection): 234 """ 235 Will close and remove connection 236 :param connection: 237 :return: 238 """ 239 raise NotImplementedError() 240 241 def check_is_connection_need_to_sent_data(self, connection: Connection): 242 """ 243 Will check connection to output data presence. It is automatically called after EACH WorkerBase callback call 244 by the IO loop. But if you are filling other connection's output buffer - you'll need to make this call for that 245 connection by your self. 246 :param connection: 247 :return: 248 """ 249 raise NotImplementedError()
You may rely and use freely use methods of this base class from inside your program or from inside your worker (WorkerBase).
193 def start(self, destroy_on_finish=True): 194 """ 195 Will start IO loop 196 :param destroy_on_finish: if True - destroy() will be called from inside of this method 197 :return: 198 """ 199 raise NotImplementedError()
Will start IO loop :param destroy_on_finish: if True - destroy() will be called from inside of this method :return:
201 def stop(self): 202 """ 203 Will initiate IO loop stop process 204 :return: 205 """ 206 raise NotImplementedError()
Will initiate IO loop stop process :return:
208 def make_connection(self, connection_info: ConnectionInfo = None, name=None)->Connection: 209 """ 210 Will create connection from given connection_info object. Than connection will be established. Immediate or 211 delayed - depends on the connection type: 212 - ConnectionType.passive - immediate; 213 - ConnectionType.active_connected - delayed. 214 In both cases WorkerBase.on_connect will be called immediately after connection will be successfully 215 established (IF it will be successfully established). 216 :param connection_info: new connection will be created using information provided in connection_info object. 217 See ConnectionInfo() for more information 218 :param name: name of the connection. If you'll provide it - you will be able to find this connection in 219 NetIOUserApi.connection_by_name dictionary by it's name. 220 :return: 221 """ 222 raise NotImplementedError()
Will create connection from given connection_info object. Than connection will be established. Immediate or delayed - depends on the connection type:
- ConnectionType.passive - immediate;
- ConnectionType.active_connected - delayed. In both cases WorkerBase.on_connect will be called immediately after connection will be successfully established (IF it will be successfully established). :param connection_info: new connection will be created using information provided in connection_info object. See ConnectionInfo() for more information :param name: name of the connection. If you'll provide it - you will be able to find this connection in NetIOUserApi.connection_by_name dictionary by it's name. :return:
224 def add_connection(self, connection: Connection): 225 """ 226 Will register already established connection. You need to use this method for example if you have already 227 connected socket 228 :param connection: 229 :return: 230 """ 231 raise NotImplementedError()
Will register already established connection. You need to use this method for example if you have already connected socket :param connection: :return:
233 def remove_connection(self, connection: Connection): 234 """ 235 Will close and remove connection 236 :param connection: 237 :return: 238 """ 239 raise NotImplementedError()
Will close and remove connection :param connection: :return:
241 def check_is_connection_need_to_sent_data(self, connection: Connection): 242 """ 243 Will check connection to output data presence. It is automatically called after EACH WorkerBase callback call 244 by the IO loop. But if you are filling other connection's output buffer - you'll need to make this call for that 245 connection by your self. 246 :param connection: 247 :return: 248 """ 249 raise NotImplementedError()
Will check connection to output data presence. It is automatically called after EACH WorkerBase callback call by the IO loop. But if you are filling other connection's output buffer - you'll need to make this call for that connection by your self. :param connection: :return:
252class NetIOCallbacks: 253 """ 254 Callbacks from this class will be called from inside (and by) IOMethodBase loop. 255 """ 256 def __init__(self): 257 super().__init__() 258 259 def on_accept_connection(self, connection: Connection): 260 raise NotImplementedError() 261 262 def on_connected(self, connection: Connection): 263 raise NotImplementedError() 264 265 def on_read(self, connection: Connection): 266 raise NotImplementedError() 267 268 def on_write(self, connection: Connection): 269 raise NotImplementedError() 270 271 def on_close(self, connection: Connection): 272 raise NotImplementedError()
Callbacks from this class will be called from inside (and by) IOMethodBase loop.
275class NetIOBase(NetIOUserApi, NetIOCallbacks): 276 """ 277 Base class for any IO implementation (Linux, BSD, Windows, cross platform, etc.). 278 """ 279 def __init__(self, transport): 280 """ 281 282 :param transport: class of the desired IOMethod. Instance (object) will be created by NetIOBase itself 283 """ 284 super().__init__() 285 self.method = transport(self) 286 287 def destroy(self): 288 raise NotImplementedError()
Base class for any IO implementation (Linux, BSD, Windows, cross platform, etc.).
279 def __init__(self, transport): 280 """ 281 282 :param transport: class of the desired IOMethod. Instance (object) will be created by NetIOBase itself 283 """ 284 super().__init__() 285 self.method = transport(self)
:param transport: class of the desired IOMethod. Instance (object) will be created by NetIOBase itself
291class WorkerBase: 292 """ 293 Base class for all workers. 294 on_* callbacks will be called by the IO loop. 295 296 General info: 297 You can read input data from self.connection at any time (see "Caution" section of __init__ doc string) from any 298 callback. 299 You can write output data (to be send) to self.connection at any time (see "Caution") from any callback. 300 """ 301 def __init__(self, api: NetIOUserApi=None, connection: Connection=None): 302 """ 303 Caution: 304 Please do not rely on self.api and self.connection inside of your __init__ constructor: it is guaranteed that 305 they will be set before any callback call, but not at the construction process. 306 307 :param api: link to the constructed network io object 308 :param connection: link to the constructed connection object 309 """ 310 self.api = api 311 self.connection = connection 312 313 def on_connect(self): 314 """ 315 Will be called after connection successfully established 316 :return: 317 """ 318 pass 319 320 def on_read(self): 321 """ 322 Will be called if there is some NEW data in the connection input buffer 323 :return: 324 """ 325 pass 326 327 def on_no_more_data_to_write(self): 328 """ 329 Will be called after all data is sent. 330 Normally it will be called once (one single shot after each portion of out data is sent). 331 If you'll set self.connection.force_write_call to True state - this callback may be called continuously 332 (but not guaranteed: it depends of used IOMethod implementation) 333 :return: 334 """ 335 pass 336 337 def on_connection_lost(self): 338 """ 339 Will be called AFTER connection socket was actually closed and removed from IOMethod checking list. 340 At this time, self.connection.connection_state is already set to ConnectionState.disconnected. 341 :return: 342 """ 343 pass 344 345 def __copy__(self): 346 """ 347 This method SHOULD be implemented. It should create a new instance and copy some global (shared) data from 348 current object to that new instance. It will be called when new peer is connected to existing passive connection 349 So this is the way you may use to give all new connection some links to some global data by worker object of 350 the passive connection replication process. 351 :return: 352 """ 353 raise NotImplementedError()
Base class for all workers. on_* callbacks will be called by the IO loop.
General info: You can read input data from self.connection at any time (see "Caution" section of __init__ doc string) from any callback. You can write output data (to be send) to self.connection at any time (see "Caution") from any callback.
301 def __init__(self, api: NetIOUserApi=None, connection: Connection=None): 302 """ 303 Caution: 304 Please do not rely on self.api and self.connection inside of your __init__ constructor: it is guaranteed that 305 they will be set before any callback call, but not at the construction process. 306 307 :param api: link to the constructed network io object 308 :param connection: link to the constructed connection object 309 """ 310 self.api = api 311 self.connection = connection
Caution: Please do not rely on self.api and self.connection inside of your __init__ constructor: it is guaranteed that they will be set before any callback call, but not at the construction process.
:param api: link to the constructed network io object :param connection: link to the constructed connection object
313 def on_connect(self): 314 """ 315 Will be called after connection successfully established 316 :return: 317 """ 318 pass
Will be called after connection successfully established :return:
320 def on_read(self): 321 """ 322 Will be called if there is some NEW data in the connection input buffer 323 :return: 324 """ 325 pass
Will be called if there is some NEW data in the connection input buffer :return:
327 def on_no_more_data_to_write(self): 328 """ 329 Will be called after all data is sent. 330 Normally it will be called once (one single shot after each portion of out data is sent). 331 If you'll set self.connection.force_write_call to True state - this callback may be called continuously 332 (but not guaranteed: it depends of used IOMethod implementation) 333 :return: 334 """ 335 pass
Will be called after all data is sent. Normally it will be called once (one single shot after each portion of out data is sent). If you'll set self.connection.force_write_call to True state - this callback may be called continuously (but not guaranteed: it depends of used IOMethod implementation) :return:
337 def on_connection_lost(self): 338 """ 339 Will be called AFTER connection socket was actually closed and removed from IOMethod checking list. 340 At this time, self.connection.connection_state is already set to ConnectionState.disconnected. 341 :return: 342 """ 343 pass
Will be called AFTER connection socket was actually closed and removed from IOMethod checking list. At this time, self.connection.connection_state is already set to ConnectionState.disconnected. :return:
356class InlineWorkerBase: 357 __slots__ = ('client_id', 'keyword', 'socket_family', 'socket_type', 'socket_proto', 'addr_info', 'host_names', 358 'is_in_raw_mode', '__set__is_in_raw_mode', '__set__mark_socket_as_should_be_closed_immediately', 359 '__set__mark_socket_as_ready_to_be_closed', '__external_parameters_set_trigger', 'output_messages', 360 '__hold__client_id') 361 362 def __init__(self, client_id=None, keyword: bytes = None, socket_family=None, socket_type=None, socket_proto=None, 363 addr_info=None, host_names=None, external_parameters_set_trigger: Set = None): 364 """ 365 366 :param keyword: client keyword. You may check for a known keywords to act appropriately 367 :param socket_family: 368 :param socket_type: 369 :param socket_proto: 370 :param addr_info: result of socket.getaddrinfo() call 371 :param host_names: result of socket.gethostbyaddr() call 372 """ 373 self.client_id = client_id 374 self.keyword = keyword 375 self.socket_family = socket_family 376 self.socket_type = socket_type 377 self.socket_proto = socket_proto 378 self.addr_info = addr_info 379 self.host_names = host_names 380 self.is_in_raw_mode = None 381 382 # self.output_messages = FIFODequeWithLengthControl() 383 self.output_messages = deque() 384 # self.output_messages = list() 385 386 def on__data_received(self, data: bytes): 387 """ 388 Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data 389 Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will 390 be logged 391 :param data: piece of input data if connection is in RAW-mode and full message otherwise. 392 """ 393 pass 394 395 def on__output_buffers_are_empty(self): 396 """ 397 Will be called immediately when all output data was send. 398 Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data 399 Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will 400 be logged 401 """ 402 pass 403 404 def on__connection_lost(self): 405 """ 406 Will be called after connection was closed. Current Inline Processor object will be destroyed after this call. 407 Situation with unhandled exception will be logged. 408 """ 409 pass 410 411 def set__is_in_raw_mode(self, is_in_raw_mode: bool): 412 raise NotImplementedError() 413 414 def mark__socket_as_should_be_closed_immediately(self, mark_socket_as: bool): 415 raise NotImplementedError() 416 417 def mark__socket_as_ready_to_be_closed(self, mark_socket_as: bool): 418 raise NotImplementedError() 419 420 def __getstate__(self): 421 raise NotImplementedError() 422 423 def __setstate__(self, state): 424 raise NotImplementedError()
362 def __init__(self, client_id=None, keyword: bytes = None, socket_family=None, socket_type=None, socket_proto=None, 363 addr_info=None, host_names=None, external_parameters_set_trigger: Set = None): 364 """ 365 366 :param keyword: client keyword. You may check for a known keywords to act appropriately 367 :param socket_family: 368 :param socket_type: 369 :param socket_proto: 370 :param addr_info: result of socket.getaddrinfo() call 371 :param host_names: result of socket.gethostbyaddr() call 372 """ 373 self.client_id = client_id 374 self.keyword = keyword 375 self.socket_family = socket_family 376 self.socket_type = socket_type 377 self.socket_proto = socket_proto 378 self.addr_info = addr_info 379 self.host_names = host_names 380 self.is_in_raw_mode = None 381 382 # self.output_messages = FIFODequeWithLengthControl() 383 self.output_messages = deque() 384 # self.output_messages = list()
:param keyword: client keyword. You may check for a known keywords to act appropriately :param socket_family: :param socket_type: :param socket_proto: :param addr_info: result of socket.getaddrinfo() call :param host_names: result of socket.gethostbyaddr() call
386 def on__data_received(self, data: bytes): 387 """ 388 Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data 389 Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will 390 be logged 391 :param data: piece of input data if connection is in RAW-mode and full message otherwise. 392 """ 393 pass
Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will be logged :param data: piece of input data if connection is in RAW-mode and full message otherwise.
395 def on__output_buffers_are_empty(self): 396 """ 397 Will be called immediately when all output data was send. 398 Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data 399 Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will 400 be logged 401 """ 402 pass
Will be called immediately when all output data was send. Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will be logged
404 def on__connection_lost(self): 405 """ 406 Will be called after connection was closed. Current Inline Processor object will be destroyed after this call. 407 Situation with unhandled exception will be logged. 408 """ 409 pass
Will be called after connection was closed. Current Inline Processor object will be destroyed after this call. Situation with unhandled exception will be logged.
427@contextmanager 428def net_io(net_io_obj: NetIOBase): 429 """ 430 Context manager. 431 Usage: 432 433 main_io = NetIOBase() 434 with(net_io(main_io)) as io: 435 print('Preparing connections') 436 connection1 = io.make_connection(...) 437 connection2 = io.make_connection(...) 438 k = c + 12 439 ... 440 connectionN = io.make_connection(...) 441 print('Starting IO loop') 442 print('IO loop was finished properly') 443 444 445 :param net_io_obj: constructed IO instance (object) 446 :return: 447 """ 448 try: 449 yield net_io_obj 450 net_io_obj.start(destroy_on_finish=False) 451 finally: 452 net_io_obj.destroy()
Context manager. Usage:
main_io = NetIOBase() with(net_io(main_io)) as io: print('Preparing connections') connection1 = io.make_connection(...) connection2 = io.make_connection(...) k = c + 12 ... connectionN = io.make_connection(...) print('Starting IO loop') print('IO loop was finished properly')
:param net_io_obj: constructed IO instance (object) :return:
455class IOLoopBase: 456 """ 457 Base class for all IOMethod implementation (select, epoll, overlapped io, kqueue, etc.) 458 All his methods are called by the NetIOBase instance. 459 """ 460 def __init__(self, interface: NetIOBase): 461 self.interface = interface 462 self.should_be_closed = set() 463 pass 464 465 def loop_iteration(self, timeout=-1): 466 """ 467 Single IO loop iteration. 468 This method holds all IOMethod logic. 469 :param timeout: float or int. If timeout is negative, the call will block until there is an event. 470 :return: 471 """ 472 raise NotImplementedError() 473 474 def destroy(self): 475 """ 476 Initiates destruction process 477 :return: 478 """ 479 raise NotImplementedError() 480 481 def set__can_read(self, conn: socket.socket, state=True): 482 """ 483 Will allow (True) or disallow (False) "socket available to read" checks for socket 484 :param conn: target socket 485 :param state: True - allow; False - disallow 486 :return: 487 """ 488 raise NotImplementedError() 489 490 def set__need_write(self, conn: socket.socket, state=True): 491 """ 492 Will allow (True) or disallow (False) "socket available to write" checks for socket 493 :param conn: target socket 494 :param state: True - allow; False - disallow 495 :return: 496 """ 497 raise NotImplementedError() 498 499 def set__should_be_closed(self, conn: socket.socket): 500 """ 501 Mark socket as "should be closed" 502 :param conn: target socket 503 :return: 504 """ 505 raise NotImplementedError() 506 507 def add_connection(self, conn: socket.socket): 508 """ 509 Will add socket to the internal connections list 510 :param conn: target socket 511 :return: 512 """ 513 raise NotImplementedError() 514 515 def remove_connection(self, conn: socket.socket): 516 """ 517 Will remove socket from the internal connections list 518 :param conn: target socket 519 :return: 520 """ 521 raise NotImplementedError()
Base class for all IOMethod implementation (select, epoll, overlapped io, kqueue, etc.) All his methods are called by the NetIOBase instance.
465 def loop_iteration(self, timeout=-1): 466 """ 467 Single IO loop iteration. 468 This method holds all IOMethod logic. 469 :param timeout: float or int. If timeout is negative, the call will block until there is an event. 470 :return: 471 """ 472 raise NotImplementedError()
Single IO loop iteration. This method holds all IOMethod logic. :param timeout: float or int. If timeout is negative, the call will block until there is an event. :return:
474 def destroy(self): 475 """ 476 Initiates destruction process 477 :return: 478 """ 479 raise NotImplementedError()
Initiates destruction process :return:
481 def set__can_read(self, conn: socket.socket, state=True): 482 """ 483 Will allow (True) or disallow (False) "socket available to read" checks for socket 484 :param conn: target socket 485 :param state: True - allow; False - disallow 486 :return: 487 """ 488 raise NotImplementedError()
Will allow (True) or disallow (False) "socket available to read" checks for socket :param conn: target socket :param state: True - allow; False - disallow :return:
490 def set__need_write(self, conn: socket.socket, state=True): 491 """ 492 Will allow (True) or disallow (False) "socket available to write" checks for socket 493 :param conn: target socket 494 :param state: True - allow; False - disallow 495 :return: 496 """ 497 raise NotImplementedError()
Will allow (True) or disallow (False) "socket available to write" checks for socket :param conn: target socket :param state: True - allow; False - disallow :return:
499 def set__should_be_closed(self, conn: socket.socket): 500 """ 501 Mark socket as "should be closed" 502 :param conn: target socket 503 :return: 504 """ 505 raise NotImplementedError()
Mark socket as "should be closed" :param conn: target socket :return:
507 def add_connection(self, conn: socket.socket): 508 """ 509 Will add socket to the internal connections list 510 :param conn: target socket 511 :return: 512 """ 513 raise NotImplementedError()
Will add socket to the internal connections list :param conn: target socket :return:
515 def remove_connection(self, conn: socket.socket): 516 """ 517 Will remove socket from the internal connections list 518 :param conn: target socket 519 :return: 520 """ 521 raise NotImplementedError()
Will remove socket from the internal connections list :param conn: target socket :return: