cengal.io.net_io.versions.v_0.net_io_method__select

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18import select
 19from .net_io_abstract import *
 20
 21"""
 22Module Docstring
 23Docstrings: http://www.python.org/dev/peps/pep-0257/
 24"""
 25
 26__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 27__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 28__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 29__license__ = "Apache License, Version 2.0"
 30__version__ = "4.4.1"
 31__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 32__email__ = "gtalk@butenkoms.space"
 33# __status__ = "Prototype"
 34__status__ = "Development"
 35# __status__ = "Production"
 36
 37
 38class IOMethodSelect(IOMethodBase):
 39    def __init__(self, interface: NetIOBase):
 40        super().__init__(interface)
 41        self.input_check_sockets = set()
 42        self.output_check_sockets = set()
 43        self.exception_check_sockets = set()
 44
 45    def loop_iteration(self, timeout=None):
 46        timeout = timeout or -1
 47
 48        readable, writable, exceptional = select.select(self.input_check_sockets,
 49                                                        self.output_check_sockets,
 50                                                        self.exception_check_sockets,
 51                                                        timeout)
 52
 53        for fileno in readable:
 54            # Read available. We can try to read even even if an error occurred
 55            connection = self.interface.connection_by_fileno[fileno]
 56            if ConnectionType.passive == connection.connection_info.connection_type:
 57                self.interface.on_accept_connection(connection)
 58            else:
 59                self.interface.on_read(connection)
 60
 61        for fileno in exceptional:
 62            # Some error. Connection should be closed
 63            connection = self.interface.connection_by_fileno[fileno]
 64            self.should_be_closed.add(connection.conn)
 65
 66        writable -= exceptional
 67
 68        for fileno in writable:
 69            # Write available. We will not write data if an error occurred
 70            connection = self.interface.connection_by_fileno[fileno]
 71            if ConnectionState.waiting_for_connection == connection.connection_state:
 72                if not connection.conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR):
 73                    # Connected successfully:
 74                    self.interface.on_connected(connection)
 75                else:
 76                    # Some connection error - will be closed:
 77                    self.should_be_closed.add(connection.conn)
 78            else:
 79                self.interface.on_write(connection)
 80
 81        self._close_all()
 82
 83    def _close_all(self):
 84        should_be_closed = self.should_be_closed
 85        self.should_be_closed = set()
 86        for conn in should_be_closed:
 87            fileno = conn.fileno()
 88            if fileno in self.interface.connection_by_fileno:
 89                connection = self.interface.connection_by_fileno[fileno]
 90                self.interface.on_close(connection)  # self.remove_connection() should be run from inside of this
 91                #   callback
 92            else:
 93                self.remove_connection(conn)
 94
 95    def destroy(self):
 96        self._close_all()
 97
 98    def add_connection(self, conn: socket.socket):
 99        fileno = conn.fileno()
100        self.exception_check_sockets.add(fileno)
101        self.input_check_sockets.add(fileno)
102
103    def remove_connection(self, conn: socket.socket):
104        fileno = conn.fileno()
105        if fileno in self.exception_check_sockets:
106            self.exception_check_sockets.remove(fileno)
107        if fileno in self.input_check_sockets:
108            self.input_check_sockets.remove(fileno)
109        if fileno in self.output_check_sockets:
110            self.output_check_sockets.remove(fileno)
111
112    def set__need_write(self, conn: socket.socket, state=True):
113        fileno = conn.fileno()
114        if fileno in self.exception_check_sockets:
115            if state:
116                self.output_check_sockets.add(fileno)
117            else:
118                if fileno in self.output_check_sockets:
119                    self.output_check_sockets.remove(fileno)
120
121    def set__should_be_closed(self, conn: socket.socket):
122        self.should_be_closed.add(conn)
class IOMethodSelect(cengal.io.net_io.versions.v_0.net_io_abstract.IOMethodBase):
 39class IOMethodSelect(IOMethodBase):
 40    def __init__(self, interface: NetIOBase):
 41        super().__init__(interface)
 42        self.input_check_sockets = set()
 43        self.output_check_sockets = set()
 44        self.exception_check_sockets = set()
 45
 46    def loop_iteration(self, timeout=None):
 47        timeout = timeout or -1
 48
 49        readable, writable, exceptional = select.select(self.input_check_sockets,
 50                                                        self.output_check_sockets,
 51                                                        self.exception_check_sockets,
 52                                                        timeout)
 53
 54        for fileno in readable:
 55            # Read available. We can try to read even even if an error occurred
 56            connection = self.interface.connection_by_fileno[fileno]
 57            if ConnectionType.passive == connection.connection_info.connection_type:
 58                self.interface.on_accept_connection(connection)
 59            else:
 60                self.interface.on_read(connection)
 61
 62        for fileno in exceptional:
 63            # Some error. Connection should be closed
 64            connection = self.interface.connection_by_fileno[fileno]
 65            self.should_be_closed.add(connection.conn)
 66
 67        writable -= exceptional
 68
 69        for fileno in writable:
 70            # Write available. We will not write data if an error occurred
 71            connection = self.interface.connection_by_fileno[fileno]
 72            if ConnectionState.waiting_for_connection == connection.connection_state:
 73                if not connection.conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR):
 74                    # Connected successfully:
 75                    self.interface.on_connected(connection)
 76                else:
 77                    # Some connection error - will be closed:
 78                    self.should_be_closed.add(connection.conn)
 79            else:
 80                self.interface.on_write(connection)
 81
 82        self._close_all()
 83
 84    def _close_all(self):
 85        should_be_closed = self.should_be_closed
 86        self.should_be_closed = set()
 87        for conn in should_be_closed:
 88            fileno = conn.fileno()
 89            if fileno in self.interface.connection_by_fileno:
 90                connection = self.interface.connection_by_fileno[fileno]
 91                self.interface.on_close(connection)  # self.remove_connection() should be run from inside of this
 92                #   callback
 93            else:
 94                self.remove_connection(conn)
 95
 96    def destroy(self):
 97        self._close_all()
 98
 99    def add_connection(self, conn: socket.socket):
100        fileno = conn.fileno()
101        self.exception_check_sockets.add(fileno)
102        self.input_check_sockets.add(fileno)
103
104    def remove_connection(self, conn: socket.socket):
105        fileno = conn.fileno()
106        if fileno in self.exception_check_sockets:
107            self.exception_check_sockets.remove(fileno)
108        if fileno in self.input_check_sockets:
109            self.input_check_sockets.remove(fileno)
110        if fileno in self.output_check_sockets:
111            self.output_check_sockets.remove(fileno)
112
113    def set__need_write(self, conn: socket.socket, state=True):
114        fileno = conn.fileno()
115        if fileno in self.exception_check_sockets:
116            if state:
117                self.output_check_sockets.add(fileno)
118            else:
119                if fileno in self.output_check_sockets:
120                    self.output_check_sockets.remove(fileno)
121
122    def set__should_be_closed(self, conn: socket.socket):
123        self.should_be_closed.add(conn)

Base class for all IOMethod implementation (select, epoll, overlapped io, kqueue, etc.) All his methods are called by the NetIOBase instance.

IOMethodSelect(interface: cengal.io.net_io.versions.v_0.net_io_abstract.NetIOBase)
40    def __init__(self, interface: NetIOBase):
41        super().__init__(interface)
42        self.input_check_sockets = set()
43        self.output_check_sockets = set()
44        self.exception_check_sockets = set()
input_check_sockets
output_check_sockets
exception_check_sockets
def loop_iteration(self, timeout=None):
46    def loop_iteration(self, timeout=None):
47        timeout = timeout or -1
48
49        readable, writable, exceptional = select.select(self.input_check_sockets,
50                                                        self.output_check_sockets,
51                                                        self.exception_check_sockets,
52                                                        timeout)
53
54        for fileno in readable:
55            # Read available. We can try to read even even if an error occurred
56            connection = self.interface.connection_by_fileno[fileno]
57            if ConnectionType.passive == connection.connection_info.connection_type:
58                self.interface.on_accept_connection(connection)
59            else:
60                self.interface.on_read(connection)
61
62        for fileno in exceptional:
63            # Some error. Connection should be closed
64            connection = self.interface.connection_by_fileno[fileno]
65            self.should_be_closed.add(connection.conn)
66
67        writable -= exceptional
68
69        for fileno in writable:
70            # Write available. We will not write data if an error occurred
71            connection = self.interface.connection_by_fileno[fileno]
72            if ConnectionState.waiting_for_connection == connection.connection_state:
73                if not connection.conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR):
74                    # Connected successfully:
75                    self.interface.on_connected(connection)
76                else:
77                    # Some connection error - will be closed:
78                    self.should_be_closed.add(connection.conn)
79            else:
80                self.interface.on_write(connection)
81
82        self._close_all()

Single IO loop iteration. This method holds all IOMethod logic. :return:

def destroy(self):
96    def destroy(self):
97        self._close_all()

Initiates destruction process :return:

def add_connection(self, conn: socket.socket):
 99    def add_connection(self, conn: socket.socket):
100        fileno = conn.fileno()
101        self.exception_check_sockets.add(fileno)
102        self.input_check_sockets.add(fileno)

Will add socket to the internal connections list :param conn: target socket :return:

def remove_connection(self, conn: socket.socket):
104    def remove_connection(self, conn: socket.socket):
105        fileno = conn.fileno()
106        if fileno in self.exception_check_sockets:
107            self.exception_check_sockets.remove(fileno)
108        if fileno in self.input_check_sockets:
109            self.input_check_sockets.remove(fileno)
110        if fileno in self.output_check_sockets:
111            self.output_check_sockets.remove(fileno)

Will remove socket from the internal connections list :param conn: target socket :return:

def set__need_write(self, conn: socket.socket, state=True):
113    def set__need_write(self, conn: socket.socket, state=True):
114        fileno = conn.fileno()
115        if fileno in self.exception_check_sockets:
116            if state:
117                self.output_check_sockets.add(fileno)
118            else:
119                if fileno in self.output_check_sockets:
120                    self.output_check_sockets.remove(fileno)

Will allow (True) or disallow (False) "socket available to write" checks for socket :param conn: target socket :param state: True - allow; False - disallow :return:

def set__should_be_closed(self, conn: socket.socket):
122    def set__should_be_closed(self, conn: socket.socket):
123        self.should_be_closed.add(conn)

Mark socket as "should be closed" :param conn: target socket :return:

Inherited Members
cengal.io.net_io.versions.v_0.net_io_abstract.IOMethodBase
interface
should_be_closed
set__can_read