cengal.io.asock_io.versions.v_1.io_loops.select

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18import select
 19from ..abstract import *
 20
 21"""
 22Module Docstring
 23Docstrings: http://www.python.org/dev/peps/pep-0257/
 24"""
 25
 26__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 27__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 28__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 29__license__ = "Apache License, Version 2.0"
 30__version__ = "4.4.1"
 31__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 32__email__ = "gtalk@butenkoms.space"
 33# __status__ = "Prototype"
 34__status__ = "Development"
 35# __status__ = "Production"
 36
 37
 38class IOLoopSelect(IOLoopBase):
 39    def __init__(self, interface: NetIOBase):
 40        super().__init__(interface)
 41        self._input_check_sockets = set()
 42        self._output_check_sockets = set()
 43        self._exception_check_sockets = set()
 44        self.select = select.select
 45
 46    def loop_iteration(self, timeout=-1):
 47        if not self._input_check_sockets:
 48            return
 49
 50        readable, writable, exceptional = select.select(self._input_check_sockets,
 51                                                        self._output_check_sockets,
 52                                                        self._exception_check_sockets,
 53                                                        timeout)
 54
 55        # READ
 56        read_is_forbidden = True
 57        if (self.interface.global_in__data_full_size.result - self.interface.global_in__deletable_data_full_size.result) \
 58                <= self.interface.global_in__data_size_limit.result:
 59            read_is_forbidden = False
 60
 61            # Handle inputs
 62            for fileno in readable:
 63                connection = self.interface.connection_by_fileno[fileno]
 64                # Read available. We can try to read event even if an error occurred
 65                if ConnectionType.passive == connection.connection_info.connection_type:
 66                    self.interface.on_accept_connection(connection)
 67                else:
 68                    self.interface.on_read(connection)
 69                # read_result = self._read_data_from_socket(s)
 70                # if read_result:
 71                #     # read_result = self._read_messages_from_raw_input_into_fifo(s)
 72                #     # if read_result:
 73                #     if s in self._unconfirmed_clients:
 74                #         self._process_client_keyword(s)
 75                #         self._check_is_client_have_data_to_read_in_fifo(s)
 76                #     else:
 77                #         self._client_have_data_to_read_in_fifo(s)
 78
 79        if __debug__:
 80            read_is_forbidden_test = self.interface.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger(
 81                    read_is_forbidden)
 82            if read_is_forbidden_test is not None:
 83                if read_is_forbidden_test:
 84                    print('Read is suppressed until data will be processed.')
 85                else:
 86                    print('Read is allowed: data is processed.')
 87        # WRITE
 88        # Handle outputs
 89        for fileno in writable:
 90            curr_client_info = self._connections[self._connection_by_conn[s]]
 91            self._write_data_to_socket(curr_client_info)
 92            # self._write_data_to_socket(s)
 93
 94        # =================
 95
 96        events = self.epoll.poll(timeout)
 97        for fileno, event in events:
 98            connection = self.interface.connection_by_fileno[fileno]
 99
100            if event & select.EPOLLIN:
101                # Read available. We can try to read event even if an error occurred
102                if ConnectionType.passive == connection.connection_info.connection_type:
103                    self.interface.on_accept_connection(connection)
104                else:
105                    self.interface.on_read(connection)
106
107            if event & select.EPOLLHUP:
108                # Some error. Connection should be closed
109                self.should_be_closed.add(connection.conn)
110            elif event & select.EPOLLOUT:
111                # Write available. We will not write data if an error occurred
112                if ConnectionState.waiting_for_connection == connection.connection_state:
113                    if not connection.conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR):
114                        # Connected successfully:
115                        self.interface.on_connected(connection)
116                    else:
117                        # Some connection error - will be closed:
118                        self.should_be_closed.add(connection.conn)
119                else:
120                    self.interface.on_write(connection)
121
122            self._close_all()
123
124    def _close_all(self):
125        should_be_closed = self.should_be_closed
126        self.should_be_closed = set()
127        for conn in should_be_closed:
128            if conn.fileno() in self.interface.connection_by_fileno:
129                connection = self.interface.connection_by_fileno[conn.fileno()]
130                self.interface.on_close(connection)  # self.remove_connection() should be run from inside of this
131                #   callback
132            else:
133                self.remove_connection(conn)
134
135    def destroy(self):
136        self._close_all()
137        self.epoll.close()
138
139    def add_connection(self, conn: socket.socket):
140        self.epoll.register(conn.fileno(), select.EPOLLIN)
141
142    def remove_connection(self, conn: socket.socket):
143        self.epoll.unregister(conn.fileno())
144
145    def set__need_write(self, conn: socket.socket, state=True):
146        if state:
147            self.epoll.modify(conn.fileno(), select.EPOLLIN | select.EPOLLOUT)
148        else:
149            self.epoll.modify(conn.fileno(), select.EPOLLIN)
150
151    def set__should_be_closed(self, conn: socket.socket):
152        self.should_be_closed.add(conn)
class IOLoopSelect(cengal.io.asock_io.versions.v_1.abstract.IOLoopBase):
 39class IOLoopSelect(IOLoopBase):
 40    def __init__(self, interface: NetIOBase):
 41        super().__init__(interface)
 42        self._input_check_sockets = set()
 43        self._output_check_sockets = set()
 44        self._exception_check_sockets = set()
 45        self.select = select.select
 46
 47    def loop_iteration(self, timeout=-1):
 48        if not self._input_check_sockets:
 49            return
 50
 51        readable, writable, exceptional = select.select(self._input_check_sockets,
 52                                                        self._output_check_sockets,
 53                                                        self._exception_check_sockets,
 54                                                        timeout)
 55
 56        # READ
 57        read_is_forbidden = True
 58        if (self.interface.global_in__data_full_size.result - self.interface.global_in__deletable_data_full_size.result) \
 59                <= self.interface.global_in__data_size_limit.result:
 60            read_is_forbidden = False
 61
 62            # Handle inputs
 63            for fileno in readable:
 64                connection = self.interface.connection_by_fileno[fileno]
 65                # Read available. We can try to read event even if an error occurred
 66                if ConnectionType.passive == connection.connection_info.connection_type:
 67                    self.interface.on_accept_connection(connection)
 68                else:
 69                    self.interface.on_read(connection)
 70                # read_result = self._read_data_from_socket(s)
 71                # if read_result:
 72                #     # read_result = self._read_messages_from_raw_input_into_fifo(s)
 73                #     # if read_result:
 74                #     if s in self._unconfirmed_clients:
 75                #         self._process_client_keyword(s)
 76                #         self._check_is_client_have_data_to_read_in_fifo(s)
 77                #     else:
 78                #         self._client_have_data_to_read_in_fifo(s)
 79
 80        if __debug__:
 81            read_is_forbidden_test = self.interface.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger(
 82                    read_is_forbidden)
 83            if read_is_forbidden_test is not None:
 84                if read_is_forbidden_test:
 85                    print('Read is suppressed until data will be processed.')
 86                else:
 87                    print('Read is allowed: data is processed.')
 88        # WRITE
 89        # Handle outputs
 90        for fileno in writable:
 91            curr_client_info = self._connections[self._connection_by_conn[s]]
 92            self._write_data_to_socket(curr_client_info)
 93            # self._write_data_to_socket(s)
 94
 95        # =================
 96
 97        events = self.epoll.poll(timeout)
 98        for fileno, event in events:
 99            connection = self.interface.connection_by_fileno[fileno]
100
101            if event & select.EPOLLIN:
102                # Read available. We can try to read event even if an error occurred
103                if ConnectionType.passive == connection.connection_info.connection_type:
104                    self.interface.on_accept_connection(connection)
105                else:
106                    self.interface.on_read(connection)
107
108            if event & select.EPOLLHUP:
109                # Some error. Connection should be closed
110                self.should_be_closed.add(connection.conn)
111            elif event & select.EPOLLOUT:
112                # Write available. We will not write data if an error occurred
113                if ConnectionState.waiting_for_connection == connection.connection_state:
114                    if not connection.conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR):
115                        # Connected successfully:
116                        self.interface.on_connected(connection)
117                    else:
118                        # Some connection error - will be closed:
119                        self.should_be_closed.add(connection.conn)
120                else:
121                    self.interface.on_write(connection)
122
123            self._close_all()
124
125    def _close_all(self):
126        should_be_closed = self.should_be_closed
127        self.should_be_closed = set()
128        for conn in should_be_closed:
129            if conn.fileno() in self.interface.connection_by_fileno:
130                connection = self.interface.connection_by_fileno[conn.fileno()]
131                self.interface.on_close(connection)  # self.remove_connection() should be run from inside of this
132                #   callback
133            else:
134                self.remove_connection(conn)
135
136    def destroy(self):
137        self._close_all()
138        self.epoll.close()
139
140    def add_connection(self, conn: socket.socket):
141        self.epoll.register(conn.fileno(), select.EPOLLIN)
142
143    def remove_connection(self, conn: socket.socket):
144        self.epoll.unregister(conn.fileno())
145
146    def set__need_write(self, conn: socket.socket, state=True):
147        if state:
148            self.epoll.modify(conn.fileno(), select.EPOLLIN | select.EPOLLOUT)
149        else:
150            self.epoll.modify(conn.fileno(), select.EPOLLIN)
151
152    def set__should_be_closed(self, conn: socket.socket):
153        self.should_be_closed.add(conn)

Base class for all IOMethod implementation (select, epoll, overlapped io, kqueue, etc.) All his methods are called by the NetIOBase instance.

IOLoopSelect(interface: cengal.io.asock_io.versions.v_1.abstract.NetIOBase)
40    def __init__(self, interface: NetIOBase):
41        super().__init__(interface)
42        self._input_check_sockets = set()
43        self._output_check_sockets = set()
44        self._exception_check_sockets = set()
45        self.select = select.select
select
def loop_iteration(self, timeout=-1):
 47    def loop_iteration(self, timeout=-1):
 48        if not self._input_check_sockets:
 49            return
 50
 51        readable, writable, exceptional = select.select(self._input_check_sockets,
 52                                                        self._output_check_sockets,
 53                                                        self._exception_check_sockets,
 54                                                        timeout)
 55
 56        # READ
 57        read_is_forbidden = True
 58        if (self.interface.global_in__data_full_size.result - self.interface.global_in__deletable_data_full_size.result) \
 59                <= self.interface.global_in__data_size_limit.result:
 60            read_is_forbidden = False
 61
 62            # Handle inputs
 63            for fileno in readable:
 64                connection = self.interface.connection_by_fileno[fileno]
 65                # Read available. We can try to read event even if an error occurred
 66                if ConnectionType.passive == connection.connection_info.connection_type:
 67                    self.interface.on_accept_connection(connection)
 68                else:
 69                    self.interface.on_read(connection)
 70                # read_result = self._read_data_from_socket(s)
 71                # if read_result:
 72                #     # read_result = self._read_messages_from_raw_input_into_fifo(s)
 73                #     # if read_result:
 74                #     if s in self._unconfirmed_clients:
 75                #         self._process_client_keyword(s)
 76                #         self._check_is_client_have_data_to_read_in_fifo(s)
 77                #     else:
 78                #         self._client_have_data_to_read_in_fifo(s)
 79
 80        if __debug__:
 81            read_is_forbidden_test = self.interface.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger(
 82                    read_is_forbidden)
 83            if read_is_forbidden_test is not None:
 84                if read_is_forbidden_test:
 85                    print('Read is suppressed until data will be processed.')
 86                else:
 87                    print('Read is allowed: data is processed.')
 88        # WRITE
 89        # Handle outputs
 90        for fileno in writable:
 91            curr_client_info = self._connections[self._connection_by_conn[s]]
 92            self._write_data_to_socket(curr_client_info)
 93            # self._write_data_to_socket(s)
 94
 95        # =================
 96
 97        events = self.epoll.poll(timeout)
 98        for fileno, event in events:
 99            connection = self.interface.connection_by_fileno[fileno]
100
101            if event & select.EPOLLIN:
102                # Read available. We can try to read event even if an error occurred
103                if ConnectionType.passive == connection.connection_info.connection_type:
104                    self.interface.on_accept_connection(connection)
105                else:
106                    self.interface.on_read(connection)
107
108            if event & select.EPOLLHUP:
109                # Some error. Connection should be closed
110                self.should_be_closed.add(connection.conn)
111            elif event & select.EPOLLOUT:
112                # Write available. We will not write data if an error occurred
113                if ConnectionState.waiting_for_connection == connection.connection_state:
114                    if not connection.conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR):
115                        # Connected successfully:
116                        self.interface.on_connected(connection)
117                    else:
118                        # Some connection error - will be closed:
119                        self.should_be_closed.add(connection.conn)
120                else:
121                    self.interface.on_write(connection)
122
123            self._close_all()

Single IO loop iteration. This method holds all IOMethod logic. :param timeout: float or int. If timeout is negative, the call will block until there is an event. :return:

def destroy(self):
136    def destroy(self):
137        self._close_all()
138        self.epoll.close()

Initiates destruction process :return:

def add_connection(self, conn: socket.socket):
140    def add_connection(self, conn: socket.socket):
141        self.epoll.register(conn.fileno(), select.EPOLLIN)

Will add socket to the internal connections list :param conn: target socket :return:

def remove_connection(self, conn: socket.socket):
143    def remove_connection(self, conn: socket.socket):
144        self.epoll.unregister(conn.fileno())

Will remove socket from the internal connections list :param conn: target socket :return:

def set__need_write(self, conn: socket.socket, state=True):
146    def set__need_write(self, conn: socket.socket, state=True):
147        if state:
148            self.epoll.modify(conn.fileno(), select.EPOLLIN | select.EPOLLOUT)
149        else:
150            self.epoll.modify(conn.fileno(), select.EPOLLIN)

Will allow (True) or disallow (False) "socket available to write" checks for socket :param conn: target socket :param state: True - allow; False - disallow :return:

def set__should_be_closed(self, conn: socket.socket):
152    def set__should_be_closed(self, conn: socket.socket):
153        self.should_be_closed.add(conn)

Mark socket as "should be closed" :param conn: target socket :return:

Inherited Members
cengal.io.asock_io.versions.v_1.abstract.IOLoopBase
interface
should_be_closed
set__can_read