cengal.io.net_io.versions.v_0.net_io_method__select
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18import select 19from .net_io_abstract import * 20 21""" 22Module Docstring 23Docstrings: http://www.python.org/dev/peps/pep-0257/ 24""" 25 26__author__ = "ButenkoMS <gtalk@butenkoms.space>" 27__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 28__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 29__license__ = "Apache License, Version 2.0" 30__version__ = "4.4.1" 31__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 32__email__ = "gtalk@butenkoms.space" 33# __status__ = "Prototype" 34__status__ = "Development" 35# __status__ = "Production" 36 37 38class IOMethodSelect(IOMethodBase): 39 def __init__(self, interface: NetIOBase): 40 super().__init__(interface) 41 self.input_check_sockets = set() 42 self.output_check_sockets = set() 43 self.exception_check_sockets = set() 44 45 def loop_iteration(self, timeout=None): 46 timeout = timeout or -1 47 48 readable, writable, exceptional = select.select(self.input_check_sockets, 49 self.output_check_sockets, 50 self.exception_check_sockets, 51 timeout) 52 53 for fileno in readable: 54 # Read available. We can try to read even even if an error occurred 55 connection = self.interface.connection_by_fileno[fileno] 56 if ConnectionType.passive == connection.connection_info.connection_type: 57 self.interface.on_accept_connection(connection) 58 else: 59 self.interface.on_read(connection) 60 61 for fileno in exceptional: 62 # Some error. Connection should be closed 63 connection = self.interface.connection_by_fileno[fileno] 64 self.should_be_closed.add(connection.conn) 65 66 writable -= exceptional 67 68 for fileno in writable: 69 # Write available. We will not write data if an error occurred 70 connection = self.interface.connection_by_fileno[fileno] 71 if ConnectionState.waiting_for_connection == connection.connection_state: 72 if not connection.conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR): 73 # Connected successfully: 74 self.interface.on_connected(connection) 75 else: 76 # Some connection error - will be closed: 77 self.should_be_closed.add(connection.conn) 78 else: 79 self.interface.on_write(connection) 80 81 self._close_all() 82 83 def _close_all(self): 84 should_be_closed = self.should_be_closed 85 self.should_be_closed = set() 86 for conn in should_be_closed: 87 fileno = conn.fileno() 88 if fileno in self.interface.connection_by_fileno: 89 connection = self.interface.connection_by_fileno[fileno] 90 self.interface.on_close(connection) # self.remove_connection() should be run from inside of this 91 # callback 92 else: 93 self.remove_connection(conn) 94 95 def destroy(self): 96 self._close_all() 97 98 def add_connection(self, conn: socket.socket): 99 fileno = conn.fileno() 100 self.exception_check_sockets.add(fileno) 101 self.input_check_sockets.add(fileno) 102 103 def remove_connection(self, conn: socket.socket): 104 fileno = conn.fileno() 105 if fileno in self.exception_check_sockets: 106 self.exception_check_sockets.remove(fileno) 107 if fileno in self.input_check_sockets: 108 self.input_check_sockets.remove(fileno) 109 if fileno in self.output_check_sockets: 110 self.output_check_sockets.remove(fileno) 111 112 def set__need_write(self, conn: socket.socket, state=True): 113 fileno = conn.fileno() 114 if fileno in self.exception_check_sockets: 115 if state: 116 self.output_check_sockets.add(fileno) 117 else: 118 if fileno in self.output_check_sockets: 119 self.output_check_sockets.remove(fileno) 120 121 def set__should_be_closed(self, conn: socket.socket): 122 self.should_be_closed.add(conn)
class
IOMethodSelect(cengal.io.net_io.versions.v_0.net_io_abstract.IOMethodBase):
39class IOMethodSelect(IOMethodBase): 40 def __init__(self, interface: NetIOBase): 41 super().__init__(interface) 42 self.input_check_sockets = set() 43 self.output_check_sockets = set() 44 self.exception_check_sockets = set() 45 46 def loop_iteration(self, timeout=None): 47 timeout = timeout or -1 48 49 readable, writable, exceptional = select.select(self.input_check_sockets, 50 self.output_check_sockets, 51 self.exception_check_sockets, 52 timeout) 53 54 for fileno in readable: 55 # Read available. We can try to read even even if an error occurred 56 connection = self.interface.connection_by_fileno[fileno] 57 if ConnectionType.passive == connection.connection_info.connection_type: 58 self.interface.on_accept_connection(connection) 59 else: 60 self.interface.on_read(connection) 61 62 for fileno in exceptional: 63 # Some error. Connection should be closed 64 connection = self.interface.connection_by_fileno[fileno] 65 self.should_be_closed.add(connection.conn) 66 67 writable -= exceptional 68 69 for fileno in writable: 70 # Write available. We will not write data if an error occurred 71 connection = self.interface.connection_by_fileno[fileno] 72 if ConnectionState.waiting_for_connection == connection.connection_state: 73 if not connection.conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR): 74 # Connected successfully: 75 self.interface.on_connected(connection) 76 else: 77 # Some connection error - will be closed: 78 self.should_be_closed.add(connection.conn) 79 else: 80 self.interface.on_write(connection) 81 82 self._close_all() 83 84 def _close_all(self): 85 should_be_closed = self.should_be_closed 86 self.should_be_closed = set() 87 for conn in should_be_closed: 88 fileno = conn.fileno() 89 if fileno in self.interface.connection_by_fileno: 90 connection = self.interface.connection_by_fileno[fileno] 91 self.interface.on_close(connection) # self.remove_connection() should be run from inside of this 92 # callback 93 else: 94 self.remove_connection(conn) 95 96 def destroy(self): 97 self._close_all() 98 99 def add_connection(self, conn: socket.socket): 100 fileno = conn.fileno() 101 self.exception_check_sockets.add(fileno) 102 self.input_check_sockets.add(fileno) 103 104 def remove_connection(self, conn: socket.socket): 105 fileno = conn.fileno() 106 if fileno in self.exception_check_sockets: 107 self.exception_check_sockets.remove(fileno) 108 if fileno in self.input_check_sockets: 109 self.input_check_sockets.remove(fileno) 110 if fileno in self.output_check_sockets: 111 self.output_check_sockets.remove(fileno) 112 113 def set__need_write(self, conn: socket.socket, state=True): 114 fileno = conn.fileno() 115 if fileno in self.exception_check_sockets: 116 if state: 117 self.output_check_sockets.add(fileno) 118 else: 119 if fileno in self.output_check_sockets: 120 self.output_check_sockets.remove(fileno) 121 122 def set__should_be_closed(self, conn: socket.socket): 123 self.should_be_closed.add(conn)
Base class for all IOMethod implementation (select, epoll, overlapped io, kqueue, etc.) All his methods are called by the NetIOBase instance.
def
loop_iteration(self, timeout=None):
46 def loop_iteration(self, timeout=None): 47 timeout = timeout or -1 48 49 readable, writable, exceptional = select.select(self.input_check_sockets, 50 self.output_check_sockets, 51 self.exception_check_sockets, 52 timeout) 53 54 for fileno in readable: 55 # Read available. We can try to read even even if an error occurred 56 connection = self.interface.connection_by_fileno[fileno] 57 if ConnectionType.passive == connection.connection_info.connection_type: 58 self.interface.on_accept_connection(connection) 59 else: 60 self.interface.on_read(connection) 61 62 for fileno in exceptional: 63 # Some error. Connection should be closed 64 connection = self.interface.connection_by_fileno[fileno] 65 self.should_be_closed.add(connection.conn) 66 67 writable -= exceptional 68 69 for fileno in writable: 70 # Write available. We will not write data if an error occurred 71 connection = self.interface.connection_by_fileno[fileno] 72 if ConnectionState.waiting_for_connection == connection.connection_state: 73 if not connection.conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR): 74 # Connected successfully: 75 self.interface.on_connected(connection) 76 else: 77 # Some connection error - will be closed: 78 self.should_be_closed.add(connection.conn) 79 else: 80 self.interface.on_write(connection) 81 82 self._close_all()
Single IO loop iteration. This method holds all IOMethod logic. :return:
def
add_connection(self, conn: socket.socket):
99 def add_connection(self, conn: socket.socket): 100 fileno = conn.fileno() 101 self.exception_check_sockets.add(fileno) 102 self.input_check_sockets.add(fileno)
Will add socket to the internal connections list :param conn: target socket :return:
def
remove_connection(self, conn: socket.socket):
104 def remove_connection(self, conn: socket.socket): 105 fileno = conn.fileno() 106 if fileno in self.exception_check_sockets: 107 self.exception_check_sockets.remove(fileno) 108 if fileno in self.input_check_sockets: 109 self.input_check_sockets.remove(fileno) 110 if fileno in self.output_check_sockets: 111 self.output_check_sockets.remove(fileno)
Will remove socket from the internal connections list :param conn: target socket :return:
def
set__need_write(self, conn: socket.socket, state=True):
113 def set__need_write(self, conn: socket.socket, state=True): 114 fileno = conn.fileno() 115 if fileno in self.exception_check_sockets: 116 if state: 117 self.output_check_sockets.add(fileno) 118 else: 119 if fileno in self.output_check_sockets: 120 self.output_check_sockets.remove(fileno)
Will allow (True) or disallow (False) "socket available to write" checks for socket :param conn: target socket :param state: True - allow; False - disallow :return:
def
set__should_be_closed(self, conn: socket.socket):
Mark socket as "should be closed" :param conn: target socket :return:
Inherited Members
- cengal.io.net_io.versions.v_0.net_io_abstract.IOMethodBase
- interface
- should_be_closed
- set__can_read