cengal.io.net_io.versions.v_0.net_io_method__epoll_lt

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18import select
 19from .net_io_abstract import *
 20
 21"""
 22Module Docstring
 23Docstrings: http://www.python.org/dev/peps/pep-0257/
 24"""
 25
 26__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 27__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 28__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 29__license__ = "Apache License, Version 2.0"
 30__version__ = "4.4.1"
 31__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 32__email__ = "gtalk@butenkoms.space"
 33# __status__ = "Prototype"
 34__status__ = "Development"
 35# __status__ = "Production"
 36
 37
 38class IOMethodEpollLT(IOMethodBase):
 39    def __init__(self, interface: NetIOBase):
 40        super().__init__(interface)
 41        self.epoll = select.epoll()
 42
 43    def loop_iteration(self, timeout=None):
 44        timeout = timeout or -1
 45
 46        events = self.epoll.poll(timeout)
 47        for fileno, event in events:
 48            connection = self.interface.connection_by_fileno[fileno]
 49
 50            if event & select.EPOLLIN:
 51                # Read available. We can try to read even even if an error occurred
 52                if ConnectionType.passive == connection.connection_info.connection_type:
 53                    self.interface.on_accept_connection(connection)
 54                else:
 55                    self.interface.on_read(connection)
 56
 57            if event & select.EPOLLHUP:
 58                # Some error. Connection should be closed
 59                self.should_be_closed.add(connection.conn)
 60            elif event & select.EPOLLOUT:
 61                # Write available. We will not write data if an error occurred
 62                if ConnectionState.waiting_for_connection == connection.connection_state:
 63                    if not connection.conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR):
 64                        # Connected successfully:
 65                        self.interface.on_connected(connection)
 66                    else:
 67                        # Some connection error - will be closed:
 68                        self.should_be_closed.add(connection.conn)
 69                else:
 70                    self.interface.on_write(connection)
 71
 72            self._close_all()
 73
 74    def _close_all(self):
 75        should_be_closed = self.should_be_closed
 76        self.should_be_closed = set()
 77        for conn in should_be_closed:
 78            fileno = conn.fileno()
 79            if fileno in self.interface.connection_by_fileno:
 80                connection = self.interface.connection_by_fileno[fileno]
 81                self.interface.on_close(connection)  # self.remove_connection() should be run from inside of this
 82                #   callback
 83            else:
 84                self.remove_connection(conn)
 85
 86    def destroy(self):
 87        self._close_all()
 88        self.epoll.close()
 89
 90    def add_connection(self, conn: socket.socket):
 91        self.epoll.register(conn.fileno(), select.EPOLLIN)
 92
 93    def remove_connection(self, conn: socket.socket):
 94        self.epoll.unregister(conn.fileno())
 95
 96    def set__need_write(self, conn: socket.socket, state=True):
 97        if state:
 98            self.epoll.modify(conn.fileno(), select.EPOLLIN | select.EPOLLOUT)
 99        else:
100            self.epoll.modify(conn.fileno(), select.EPOLLIN)
101
102    def set__should_be_closed(self, conn: socket.socket):
103        self.should_be_closed.add(conn)
class IOMethodEpollLT(cengal.io.net_io.versions.v_0.net_io_abstract.IOMethodBase):
 39class IOMethodEpollLT(IOMethodBase):
 40    def __init__(self, interface: NetIOBase):
 41        super().__init__(interface)
 42        self.epoll = select.epoll()
 43
 44    def loop_iteration(self, timeout=None):
 45        timeout = timeout or -1
 46
 47        events = self.epoll.poll(timeout)
 48        for fileno, event in events:
 49            connection = self.interface.connection_by_fileno[fileno]
 50
 51            if event & select.EPOLLIN:
 52                # Read available. We can try to read even even if an error occurred
 53                if ConnectionType.passive == connection.connection_info.connection_type:
 54                    self.interface.on_accept_connection(connection)
 55                else:
 56                    self.interface.on_read(connection)
 57
 58            if event & select.EPOLLHUP:
 59                # Some error. Connection should be closed
 60                self.should_be_closed.add(connection.conn)
 61            elif event & select.EPOLLOUT:
 62                # Write available. We will not write data if an error occurred
 63                if ConnectionState.waiting_for_connection == connection.connection_state:
 64                    if not connection.conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR):
 65                        # Connected successfully:
 66                        self.interface.on_connected(connection)
 67                    else:
 68                        # Some connection error - will be closed:
 69                        self.should_be_closed.add(connection.conn)
 70                else:
 71                    self.interface.on_write(connection)
 72
 73            self._close_all()
 74
 75    def _close_all(self):
 76        should_be_closed = self.should_be_closed
 77        self.should_be_closed = set()
 78        for conn in should_be_closed:
 79            fileno = conn.fileno()
 80            if fileno in self.interface.connection_by_fileno:
 81                connection = self.interface.connection_by_fileno[fileno]
 82                self.interface.on_close(connection)  # self.remove_connection() should be run from inside of this
 83                #   callback
 84            else:
 85                self.remove_connection(conn)
 86
 87    def destroy(self):
 88        self._close_all()
 89        self.epoll.close()
 90
 91    def add_connection(self, conn: socket.socket):
 92        self.epoll.register(conn.fileno(), select.EPOLLIN)
 93
 94    def remove_connection(self, conn: socket.socket):
 95        self.epoll.unregister(conn.fileno())
 96
 97    def set__need_write(self, conn: socket.socket, state=True):
 98        if state:
 99            self.epoll.modify(conn.fileno(), select.EPOLLIN | select.EPOLLOUT)
100        else:
101            self.epoll.modify(conn.fileno(), select.EPOLLIN)
102
103    def set__should_be_closed(self, conn: socket.socket):
104        self.should_be_closed.add(conn)

Base class for all IOMethod implementation (select, epoll, overlapped io, kqueue, etc.) All his methods are called by the NetIOBase instance.

IOMethodEpollLT(interface: cengal.io.net_io.versions.v_0.net_io_abstract.NetIOBase)
40    def __init__(self, interface: NetIOBase):
41        super().__init__(interface)
42        self.epoll = select.epoll()
epoll
def loop_iteration(self, timeout=None):
44    def loop_iteration(self, timeout=None):
45        timeout = timeout or -1
46
47        events = self.epoll.poll(timeout)
48        for fileno, event in events:
49            connection = self.interface.connection_by_fileno[fileno]
50
51            if event & select.EPOLLIN:
52                # Read available. We can try to read even even if an error occurred
53                if ConnectionType.passive == connection.connection_info.connection_type:
54                    self.interface.on_accept_connection(connection)
55                else:
56                    self.interface.on_read(connection)
57
58            if event & select.EPOLLHUP:
59                # Some error. Connection should be closed
60                self.should_be_closed.add(connection.conn)
61            elif event & select.EPOLLOUT:
62                # Write available. We will not write data if an error occurred
63                if ConnectionState.waiting_for_connection == connection.connection_state:
64                    if not connection.conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR):
65                        # Connected successfully:
66                        self.interface.on_connected(connection)
67                    else:
68                        # Some connection error - will be closed:
69                        self.should_be_closed.add(connection.conn)
70                else:
71                    self.interface.on_write(connection)
72
73            self._close_all()

Single IO loop iteration. This method holds all IOMethod logic. :return:

def destroy(self):
87    def destroy(self):
88        self._close_all()
89        self.epoll.close()

Initiates destruction process :return:

def add_connection(self, conn: socket.socket):
91    def add_connection(self, conn: socket.socket):
92        self.epoll.register(conn.fileno(), select.EPOLLIN)

Will add socket to the internal connections list :param conn: target socket :return:

def remove_connection(self, conn: socket.socket):
94    def remove_connection(self, conn: socket.socket):
95        self.epoll.unregister(conn.fileno())

Will remove socket from the internal connections list :param conn: target socket :return:

def set__need_write(self, conn: socket.socket, state=True):
 97    def set__need_write(self, conn: socket.socket, state=True):
 98        if state:
 99            self.epoll.modify(conn.fileno(), select.EPOLLIN | select.EPOLLOUT)
100        else:
101            self.epoll.modify(conn.fileno(), select.EPOLLIN)

Will allow (True) or disallow (False) "socket available to write" checks for socket :param conn: target socket :param state: True - allow; False - disallow :return:

def set__should_be_closed(self, conn: socket.socket):
103    def set__should_be_closed(self, conn: socket.socket):
104        self.should_be_closed.add(conn)

Mark socket as "should be closed" :param conn: target socket :return:

Inherited Members
cengal.io.net_io.versions.v_0.net_io_abstract.IOMethodBase
interface
should_be_closed
set__can_read