cengal.io.asock_io.versions.v_1.io_loops.select
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18import select 19from ..abstract import * 20 21""" 22Module Docstring 23Docstrings: http://www.python.org/dev/peps/pep-0257/ 24""" 25 26__author__ = "ButenkoMS <gtalk@butenkoms.space>" 27__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 28__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 29__license__ = "Apache License, Version 2.0" 30__version__ = "4.4.1" 31__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 32__email__ = "gtalk@butenkoms.space" 33# __status__ = "Prototype" 34__status__ = "Development" 35# __status__ = "Production" 36 37 38class IOLoopSelect(IOLoopBase): 39 def __init__(self, interface: NetIOBase): 40 super().__init__(interface) 41 self._input_check_sockets = set() 42 self._output_check_sockets = set() 43 self._exception_check_sockets = set() 44 self.select = select.select 45 46 def loop_iteration(self, timeout=-1): 47 if not self._input_check_sockets: 48 return 49 50 readable, writable, exceptional = select.select(self._input_check_sockets, 51 self._output_check_sockets, 52 self._exception_check_sockets, 53 timeout) 54 55 # READ 56 read_is_forbidden = True 57 if (self.interface.global_in__data_full_size.result - self.interface.global_in__deletable_data_full_size.result) \ 58 <= self.interface.global_in__data_size_limit.result: 59 read_is_forbidden = False 60 61 # Handle inputs 62 for fileno in readable: 63 connection = self.interface.connection_by_fileno[fileno] 64 # Read available. We can try to read event even if an error occurred 65 if ConnectionType.passive == connection.connection_info.connection_type: 66 self.interface.on_accept_connection(connection) 67 else: 68 self.interface.on_read(connection) 69 # read_result = self._read_data_from_socket(s) 70 # if read_result: 71 # # read_result = self._read_messages_from_raw_input_into_fifo(s) 72 # # if read_result: 73 # if s in self._unconfirmed_clients: 74 # self._process_client_keyword(s) 75 # self._check_is_client_have_data_to_read_in_fifo(s) 76 # else: 77 # self._client_have_data_to_read_in_fifo(s) 78 79 if __debug__: 80 read_is_forbidden_test = self.interface.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger( 81 read_is_forbidden) 82 if read_is_forbidden_test is not None: 83 if read_is_forbidden_test: 84 print('Read is suppressed until data will be processed.') 85 else: 86 print('Read is allowed: data is processed.') 87 # WRITE 88 # Handle outputs 89 for fileno in writable: 90 curr_client_info = self._connections[self._connection_by_conn[s]] 91 self._write_data_to_socket(curr_client_info) 92 # self._write_data_to_socket(s) 93 94 # ================= 95 96 events = self.epoll.poll(timeout) 97 for fileno, event in events: 98 connection = self.interface.connection_by_fileno[fileno] 99 100 if event & select.EPOLLIN: 101 # Read available. We can try to read event even if an error occurred 102 if ConnectionType.passive == connection.connection_info.connection_type: 103 self.interface.on_accept_connection(connection) 104 else: 105 self.interface.on_read(connection) 106 107 if event & select.EPOLLHUP: 108 # Some error. Connection should be closed 109 self.should_be_closed.add(connection.conn) 110 elif event & select.EPOLLOUT: 111 # Write available. We will not write data if an error occurred 112 if ConnectionState.waiting_for_connection == connection.connection_state: 113 if not connection.conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR): 114 # Connected successfully: 115 self.interface.on_connected(connection) 116 else: 117 # Some connection error - will be closed: 118 self.should_be_closed.add(connection.conn) 119 else: 120 self.interface.on_write(connection) 121 122 self._close_all() 123 124 def _close_all(self): 125 should_be_closed = self.should_be_closed 126 self.should_be_closed = set() 127 for conn in should_be_closed: 128 if conn.fileno() in self.interface.connection_by_fileno: 129 connection = self.interface.connection_by_fileno[conn.fileno()] 130 self.interface.on_close(connection) # self.remove_connection() should be run from inside of this 131 # callback 132 else: 133 self.remove_connection(conn) 134 135 def destroy(self): 136 self._close_all() 137 self.epoll.close() 138 139 def add_connection(self, conn: socket.socket): 140 self.epoll.register(conn.fileno(), select.EPOLLIN) 141 142 def remove_connection(self, conn: socket.socket): 143 self.epoll.unregister(conn.fileno()) 144 145 def set__need_write(self, conn: socket.socket, state=True): 146 if state: 147 self.epoll.modify(conn.fileno(), select.EPOLLIN | select.EPOLLOUT) 148 else: 149 self.epoll.modify(conn.fileno(), select.EPOLLIN) 150 151 def set__should_be_closed(self, conn: socket.socket): 152 self.should_be_closed.add(conn)
class
IOLoopSelect(cengal.io.asock_io.versions.v_1.abstract.IOLoopBase):
39class IOLoopSelect(IOLoopBase): 40 def __init__(self, interface: NetIOBase): 41 super().__init__(interface) 42 self._input_check_sockets = set() 43 self._output_check_sockets = set() 44 self._exception_check_sockets = set() 45 self.select = select.select 46 47 def loop_iteration(self, timeout=-1): 48 if not self._input_check_sockets: 49 return 50 51 readable, writable, exceptional = select.select(self._input_check_sockets, 52 self._output_check_sockets, 53 self._exception_check_sockets, 54 timeout) 55 56 # READ 57 read_is_forbidden = True 58 if (self.interface.global_in__data_full_size.result - self.interface.global_in__deletable_data_full_size.result) \ 59 <= self.interface.global_in__data_size_limit.result: 60 read_is_forbidden = False 61 62 # Handle inputs 63 for fileno in readable: 64 connection = self.interface.connection_by_fileno[fileno] 65 # Read available. We can try to read event even if an error occurred 66 if ConnectionType.passive == connection.connection_info.connection_type: 67 self.interface.on_accept_connection(connection) 68 else: 69 self.interface.on_read(connection) 70 # read_result = self._read_data_from_socket(s) 71 # if read_result: 72 # # read_result = self._read_messages_from_raw_input_into_fifo(s) 73 # # if read_result: 74 # if s in self._unconfirmed_clients: 75 # self._process_client_keyword(s) 76 # self._check_is_client_have_data_to_read_in_fifo(s) 77 # else: 78 # self._client_have_data_to_read_in_fifo(s) 79 80 if __debug__: 81 read_is_forbidden_test = self.interface.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger( 82 read_is_forbidden) 83 if read_is_forbidden_test is not None: 84 if read_is_forbidden_test: 85 print('Read is suppressed until data will be processed.') 86 else: 87 print('Read is allowed: data is processed.') 88 # WRITE 89 # Handle outputs 90 for fileno in writable: 91 curr_client_info = self._connections[self._connection_by_conn[s]] 92 self._write_data_to_socket(curr_client_info) 93 # self._write_data_to_socket(s) 94 95 # ================= 96 97 events = self.epoll.poll(timeout) 98 for fileno, event in events: 99 connection = self.interface.connection_by_fileno[fileno] 100 101 if event & select.EPOLLIN: 102 # Read available. We can try to read event even if an error occurred 103 if ConnectionType.passive == connection.connection_info.connection_type: 104 self.interface.on_accept_connection(connection) 105 else: 106 self.interface.on_read(connection) 107 108 if event & select.EPOLLHUP: 109 # Some error. Connection should be closed 110 self.should_be_closed.add(connection.conn) 111 elif event & select.EPOLLOUT: 112 # Write available. We will not write data if an error occurred 113 if ConnectionState.waiting_for_connection == connection.connection_state: 114 if not connection.conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR): 115 # Connected successfully: 116 self.interface.on_connected(connection) 117 else: 118 # Some connection error - will be closed: 119 self.should_be_closed.add(connection.conn) 120 else: 121 self.interface.on_write(connection) 122 123 self._close_all() 124 125 def _close_all(self): 126 should_be_closed = self.should_be_closed 127 self.should_be_closed = set() 128 for conn in should_be_closed: 129 if conn.fileno() in self.interface.connection_by_fileno: 130 connection = self.interface.connection_by_fileno[conn.fileno()] 131 self.interface.on_close(connection) # self.remove_connection() should be run from inside of this 132 # callback 133 else: 134 self.remove_connection(conn) 135 136 def destroy(self): 137 self._close_all() 138 self.epoll.close() 139 140 def add_connection(self, conn: socket.socket): 141 self.epoll.register(conn.fileno(), select.EPOLLIN) 142 143 def remove_connection(self, conn: socket.socket): 144 self.epoll.unregister(conn.fileno()) 145 146 def set__need_write(self, conn: socket.socket, state=True): 147 if state: 148 self.epoll.modify(conn.fileno(), select.EPOLLIN | select.EPOLLOUT) 149 else: 150 self.epoll.modify(conn.fileno(), select.EPOLLIN) 151 152 def set__should_be_closed(self, conn: socket.socket): 153 self.should_be_closed.add(conn)
Base class for all IOMethod implementation (select, epoll, overlapped io, kqueue, etc.) All his methods are called by the NetIOBase instance.
def
loop_iteration(self, timeout=-1):
47 def loop_iteration(self, timeout=-1): 48 if not self._input_check_sockets: 49 return 50 51 readable, writable, exceptional = select.select(self._input_check_sockets, 52 self._output_check_sockets, 53 self._exception_check_sockets, 54 timeout) 55 56 # READ 57 read_is_forbidden = True 58 if (self.interface.global_in__data_full_size.result - self.interface.global_in__deletable_data_full_size.result) \ 59 <= self.interface.global_in__data_size_limit.result: 60 read_is_forbidden = False 61 62 # Handle inputs 63 for fileno in readable: 64 connection = self.interface.connection_by_fileno[fileno] 65 # Read available. We can try to read event even if an error occurred 66 if ConnectionType.passive == connection.connection_info.connection_type: 67 self.interface.on_accept_connection(connection) 68 else: 69 self.interface.on_read(connection) 70 # read_result = self._read_data_from_socket(s) 71 # if read_result: 72 # # read_result = self._read_messages_from_raw_input_into_fifo(s) 73 # # if read_result: 74 # if s in self._unconfirmed_clients: 75 # self._process_client_keyword(s) 76 # self._check_is_client_have_data_to_read_in_fifo(s) 77 # else: 78 # self._client_have_data_to_read_in_fifo(s) 79 80 if __debug__: 81 read_is_forbidden_test = self.interface.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger( 82 read_is_forbidden) 83 if read_is_forbidden_test is not None: 84 if read_is_forbidden_test: 85 print('Read is suppressed until data will be processed.') 86 else: 87 print('Read is allowed: data is processed.') 88 # WRITE 89 # Handle outputs 90 for fileno in writable: 91 curr_client_info = self._connections[self._connection_by_conn[s]] 92 self._write_data_to_socket(curr_client_info) 93 # self._write_data_to_socket(s) 94 95 # ================= 96 97 events = self.epoll.poll(timeout) 98 for fileno, event in events: 99 connection = self.interface.connection_by_fileno[fileno] 100 101 if event & select.EPOLLIN: 102 # Read available. We can try to read event even if an error occurred 103 if ConnectionType.passive == connection.connection_info.connection_type: 104 self.interface.on_accept_connection(connection) 105 else: 106 self.interface.on_read(connection) 107 108 if event & select.EPOLLHUP: 109 # Some error. Connection should be closed 110 self.should_be_closed.add(connection.conn) 111 elif event & select.EPOLLOUT: 112 # Write available. We will not write data if an error occurred 113 if ConnectionState.waiting_for_connection == connection.connection_state: 114 if not connection.conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR): 115 # Connected successfully: 116 self.interface.on_connected(connection) 117 else: 118 # Some connection error - will be closed: 119 self.should_be_closed.add(connection.conn) 120 else: 121 self.interface.on_write(connection) 122 123 self._close_all()
Single IO loop iteration. This method holds all IOMethod logic. :param timeout: float or int. If timeout is negative, the call will block until there is an event. :return:
def
add_connection(self, conn: socket.socket):
140 def add_connection(self, conn: socket.socket): 141 self.epoll.register(conn.fileno(), select.EPOLLIN)
Will add socket to the internal connections list :param conn: target socket :return:
def
remove_connection(self, conn: socket.socket):
Will remove socket from the internal connections list :param conn: target socket :return:
def
set__need_write(self, conn: socket.socket, state=True):
146 def set__need_write(self, conn: socket.socket, state=True): 147 if state: 148 self.epoll.modify(conn.fileno(), select.EPOLLIN | select.EPOLLOUT) 149 else: 150 self.epoll.modify(conn.fileno(), select.EPOLLIN)
Will allow (True) or disallow (False) "socket available to write" checks for socket :param conn: target socket :param state: True - allow; False - disallow :return:
def
set__should_be_closed(self, conn: socket.socket):
Mark socket as "should be closed" :param conn: target socket :return:
Inherited Members
- cengal.io.asock_io.versions.v_1.abstract.IOLoopBase
- interface
- should_be_closed
- set__can_read