cengal.io.asock_io.versions.v_1.io_loops.epoll_lt

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18import select
 19from ..abstract import *
 20
 21"""
 22Module Docstring
 23Docstrings: http://www.python.org/dev/peps/pep-0257/
 24"""
 25
 26__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 27__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 28__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 29__license__ = "Apache License, Version 2.0"
 30__version__ = "4.4.1"
 31__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 32__email__ = "gtalk@butenkoms.space"
 33# __status__ = "Prototype"
 34__status__ = "Development"
 35# __status__ = "Production"
 36
 37
 38class IOLoopEpollLT(IOLoopBase):
 39    def __init__(self, interface: NetIOBase):
 40        super().__init__(interface)
 41        self.epoll = select.epoll()
 42
 43    def loop_iteration(self, timeout=-1):
 44        events = self.epoll.poll(timeout)
 45        for fileno, event in events:
 46            connection = self.interface.connection_by_fileno[fileno]
 47
 48            if event & select.EPOLLIN:
 49                # Read available. We can try to read even event if an error occurred
 50                if ConnectionType.passive == connection.connection_info.connection_type:
 51                    self.interface.on_accept_connection(connection)
 52                else:
 53                    self.interface.on_read(connection)
 54
 55            if event & select.EPOLLHUP:
 56                # Some error. Connection should be closed
 57                self.should_be_closed.add(connection.conn)
 58            elif event & select.EPOLLOUT:
 59                # Write available. We will not write data if an error occurred
 60                if ConnectionState.waiting_for_connection == connection.connection_state:
 61                    if not connection.conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR):
 62                        # Connected successfully:
 63                        self.interface.on_connected(connection)
 64                    else:
 65                        # Some connection error - will be closed:
 66                        self.should_be_closed.add(connection.conn)
 67                else:
 68                    self.interface.on_write(connection)
 69
 70            self._close_all()
 71
 72    def _close_all(self):
 73        should_be_closed = self.should_be_closed
 74        self.should_be_closed = set()
 75        for conn in should_be_closed:
 76            if conn.fileno() in self.interface.connection_by_fileno:
 77                connection = self.interface.connection_by_fileno[conn.fileno()]
 78                self.interface.on_close(connection)  # self.remove_connection() should be run from inside of this
 79                #   callback
 80            else:
 81                self.remove_connection(conn)
 82
 83    def destroy(self):
 84        self._close_all()
 85        self.epoll.close()
 86
 87    def add_connection(self, conn: socket.socket):
 88        self.epoll.register(conn.fileno(), select.EPOLLIN)
 89
 90    def remove_connection(self, conn: socket.socket):
 91        self.epoll.unregister(conn.fileno())
 92
 93    def set__need_write(self, conn: socket.socket, state=True):
 94        if state:
 95            self.epoll.modify(conn.fileno(), select.EPOLLIN | select.EPOLLOUT)
 96        else:
 97            self.epoll.modify(conn.fileno(), select.EPOLLIN)
 98
 99    def set__should_be_closed(self, conn: socket.socket):
100        self.should_be_closed.add(conn)
class IOLoopEpollLT(cengal.io.asock_io.versions.v_1.abstract.IOLoopBase):
 39class IOLoopEpollLT(IOLoopBase):
 40    def __init__(self, interface: NetIOBase):
 41        super().__init__(interface)
 42        self.epoll = select.epoll()
 43
 44    def loop_iteration(self, timeout=-1):
 45        events = self.epoll.poll(timeout)
 46        for fileno, event in events:
 47            connection = self.interface.connection_by_fileno[fileno]
 48
 49            if event & select.EPOLLIN:
 50                # Read available. We can try to read even event if an error occurred
 51                if ConnectionType.passive == connection.connection_info.connection_type:
 52                    self.interface.on_accept_connection(connection)
 53                else:
 54                    self.interface.on_read(connection)
 55
 56            if event & select.EPOLLHUP:
 57                # Some error. Connection should be closed
 58                self.should_be_closed.add(connection.conn)
 59            elif event & select.EPOLLOUT:
 60                # Write available. We will not write data if an error occurred
 61                if ConnectionState.waiting_for_connection == connection.connection_state:
 62                    if not connection.conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR):
 63                        # Connected successfully:
 64                        self.interface.on_connected(connection)
 65                    else:
 66                        # Some connection error - will be closed:
 67                        self.should_be_closed.add(connection.conn)
 68                else:
 69                    self.interface.on_write(connection)
 70
 71            self._close_all()
 72
 73    def _close_all(self):
 74        should_be_closed = self.should_be_closed
 75        self.should_be_closed = set()
 76        for conn in should_be_closed:
 77            if conn.fileno() in self.interface.connection_by_fileno:
 78                connection = self.interface.connection_by_fileno[conn.fileno()]
 79                self.interface.on_close(connection)  # self.remove_connection() should be run from inside of this
 80                #   callback
 81            else:
 82                self.remove_connection(conn)
 83
 84    def destroy(self):
 85        self._close_all()
 86        self.epoll.close()
 87
 88    def add_connection(self, conn: socket.socket):
 89        self.epoll.register(conn.fileno(), select.EPOLLIN)
 90
 91    def remove_connection(self, conn: socket.socket):
 92        self.epoll.unregister(conn.fileno())
 93
 94    def set__need_write(self, conn: socket.socket, state=True):
 95        if state:
 96            self.epoll.modify(conn.fileno(), select.EPOLLIN | select.EPOLLOUT)
 97        else:
 98            self.epoll.modify(conn.fileno(), select.EPOLLIN)
 99
100    def set__should_be_closed(self, conn: socket.socket):
101        self.should_be_closed.add(conn)

Base class for all IOMethod implementation (select, epoll, overlapped io, kqueue, etc.) All his methods are called by the NetIOBase instance.

IOLoopEpollLT(interface: cengal.io.asock_io.versions.v_1.abstract.NetIOBase)
40    def __init__(self, interface: NetIOBase):
41        super().__init__(interface)
42        self.epoll = select.epoll()
epoll
def loop_iteration(self, timeout=-1):
44    def loop_iteration(self, timeout=-1):
45        events = self.epoll.poll(timeout)
46        for fileno, event in events:
47            connection = self.interface.connection_by_fileno[fileno]
48
49            if event & select.EPOLLIN:
50                # Read available. We can try to read even event if an error occurred
51                if ConnectionType.passive == connection.connection_info.connection_type:
52                    self.interface.on_accept_connection(connection)
53                else:
54                    self.interface.on_read(connection)
55
56            if event & select.EPOLLHUP:
57                # Some error. Connection should be closed
58                self.should_be_closed.add(connection.conn)
59            elif event & select.EPOLLOUT:
60                # Write available. We will not write data if an error occurred
61                if ConnectionState.waiting_for_connection == connection.connection_state:
62                    if not connection.conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR):
63                        # Connected successfully:
64                        self.interface.on_connected(connection)
65                    else:
66                        # Some connection error - will be closed:
67                        self.should_be_closed.add(connection.conn)
68                else:
69                    self.interface.on_write(connection)
70
71            self._close_all()

Single IO loop iteration. This method holds all IOMethod logic. :param timeout: float or int. If timeout is negative, the call will block until there is an event. :return:

def destroy(self):
84    def destroy(self):
85        self._close_all()
86        self.epoll.close()

Initiates destruction process :return:

def add_connection(self, conn: socket.socket):
88    def add_connection(self, conn: socket.socket):
89        self.epoll.register(conn.fileno(), select.EPOLLIN)

Will add socket to the internal connections list :param conn: target socket :return:

def remove_connection(self, conn: socket.socket):
91    def remove_connection(self, conn: socket.socket):
92        self.epoll.unregister(conn.fileno())

Will remove socket from the internal connections list :param conn: target socket :return:

def set__need_write(self, conn: socket.socket, state=True):
94    def set__need_write(self, conn: socket.socket, state=True):
95        if state:
96            self.epoll.modify(conn.fileno(), select.EPOLLIN | select.EPOLLOUT)
97        else:
98            self.epoll.modify(conn.fileno(), select.EPOLLIN)

Will allow (True) or disallow (False) "socket available to write" checks for socket :param conn: target socket :param state: True - allow; False - disallow :return:

def set__should_be_closed(self, conn: socket.socket):
100    def set__should_be_closed(self, conn: socket.socket):
101        self.should_be_closed.add(conn)

Mark socket as "should be closed" :param conn: target socket :return:

Inherited Members
cengal.io.asock_io.versions.v_1.abstract.IOLoopBase
interface
should_be_closed
set__can_read