cengal.io.net_io.versions.v_0.net_io__linux

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18from .net_io_abstract import *
 19import sys
 20import traceback
 21
 22"""
 23Module Docstring
 24Docstrings: http://www.python.org/dev/peps/pep-0257/
 25"""
 26
 27__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 28__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 29__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 30__license__ = "Apache License, Version 2.0"
 31__version__ = "4.4.1"
 32__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 33__email__ = "gtalk@butenkoms.space"
 34# __status__ = "Prototype"
 35__status__ = "Development"
 36# __status__ = "Production"
 37
 38
 39class NetIO(NetIOBase):
 40    def __init__(self, transport):
 41        super().__init__(transport)
 42
 43        self._need_to_stop = False
 44        self._already_begun = False
 45
 46        self._new_connection_id = 0
 47
 48        self.need_to_print_exceptions_info = False
 49
 50    def destroy(self):
 51        self.method.destroy()
 52
 53    def start(self, destroy_on_finish=True):
 54        if self._already_begun:
 55            raise LoopIsAlreadyBegun()
 56
 57        self._already_begun = True
 58        try:
 59            while not self._need_to_stop:
 60                self.method.loop_iteration()
 61        finally:
 62            self._already_begun = False
 63            if destroy_on_finish:
 64                self.destroy()
 65
 66    def stop(self):
 67        self._need_to_stop = True
 68
 69    def make_connection(self, connection_info: ConnectionInfo=None, name=None)->Connection:
 70        new_connection = None
 71        if ConnectionType.passive == connection_info.connection_type:
 72            new_connection = self._make_passive_connection(connection_info, name)
 73        elif ConnectionType.active_connected == connection_info.connection_type:
 74            new_connection = self._make_active_connected_connection(connection_info, name)
 75        else:
 76            raise WrongConnectionType()
 77        return new_connection
 78
 79    def add_connection(self, connection: Connection):
 80        self.all_connections.add(connection)
 81        if ConnectionType.passive == connection.connection_info.connection_type:
 82            self.passive_connections.add(connection)
 83        self.connection_by_id[connection.connection_id] = connection
 84        if connection.connection_name is not None:
 85            self.connection_by_name[connection.connection_name] = connection
 86        self.connection_by_fileno[connection.conn.fileno()] = connection
 87
 88        if connection.worker_obj.api is None:
 89            connection.worker_obj.api = self
 90        if connection.worker_obj.connection is None:
 91            connection.worker_obj.connection = connection
 92
 93        self.method.add_connection(connection.conn)
 94        self.check_is_connection_need_to_sent_data(connection)
 95
 96    def remove_connection(self, connection: Connection):
 97        connection.connection_state = ConnectionState.waiting_for_disconnection
 98        self.method.set__should_be_closed(connection.conn)
 99
100    def on_accept_connection(self, connection):
101        new_conn = None
102        try:
103            conn_and_address_pair = connection.conn.accept()
104            new_conn, new_address = conn_and_address_pair
105            new_connection = self._construct_active_accepted_connection(connection, conn_and_address_pair)
106            self.add_connection(new_connection)
107            try:
108                new_connection.worker_obj.on_connect()
109                self.check_is_connection_need_to_sent_data(new_connection)
110            except:
111                if __debug__: self.log_exception()
112                self._set_connection_to_be_closed(new_connection, ConnectionState.worker_fault)
113        except BlockingIOError:
114            pass
115        except:
116            if __debug__: self.log_exception()
117            if new_conn is not None:
118                self.method.should_be_closed.add(new_conn)
119
120    def on_connected(self, connection: Connection):
121        connection.connection_state = ConnectionState.connected
122        try:
123            connection.worker_obj.on_connect()
124            self.check_is_connection_need_to_sent_data(connection)
125        except:
126            if __debug__: self.log_exception()
127            self._set_connection_to_be_closed(connection, ConnectionState.worker_fault)
128
129    def on_read(self, connection: Connection):
130        try:
131            another_read_data_part = connection.conn.recv(1024)
132            if another_read_data_part:
133                connection.read_data += another_read_data_part
134                try:
135                    connection.worker_obj.on_read()
136                    self.check_is_connection_need_to_sent_data(connection)
137                except:
138                    if __debug__: self.log_exception()
139                    self._set_connection_to_be_closed(connection, ConnectionState.worker_fault)
140            else:
141                self._set_connection_to_be_closed(connection, ConnectionState.io_fault)
142        except BlockingIOError:
143            pass
144        except:
145            if __debug__: self.log_exception()
146            self._set_connection_to_be_closed(connection, ConnectionState.io_fault)
147
148    def on_write(self, connection: Connection):
149        try:
150            if connection.must_be_written_data:
151                nsent = connection.conn.send(connection.must_be_written_data)
152                connection.must_be_written_data = connection.must_be_written_data[nsent:]
153            if not connection.must_be_written_data:
154                try:
155                    connection.worker_obj.on_no_more_data_to_write()
156                    self.check_is_connection_need_to_sent_data(connection)
157                except:
158                    if __debug__: self.log_exception()
159                    self._set_connection_to_be_closed(connection, ConnectionState.worker_fault)
160        except BlockingIOError:
161            pass
162        except:
163            if __debug__: self.log_exception()
164            self._set_connection_to_be_closed(connection, ConnectionState.io_fault)
165
166    def on_close(self, connection: Connection):
167        self._remove_connection_from_internal_structures(connection)
168        connection.conn.close()
169        connection.connection_state = ConnectionState.disconnected
170        try:
171            connection.worker_obj.on_connection_lost()
172        except:
173            if __debug__: self.log_exception()
174            pass
175
176    def _get_new_connection_id(self):
177        result = self._new_connection_id
178        self._new_connection_id += 1
179        return result
180
181    def _construct_active_accepted_connection(self, base_passive_connection: Connection,
182                                              conn_and_address_pair: tuple):
183        conn, address = conn_and_address_pair
184        new_worker_obj = copy.copy(base_passive_connection.worker_obj)
185        new_worker_obj.api = None
186        new_worker_obj.connection = None
187        new_connection_info = ConnectionInfo(new_worker_obj,
188                                             ConnectionType.active_accepted,
189                                             address, conn.family, conn.type, conn.proto)
190        new_connection = Connection(self._get_new_connection_id(), new_connection_info, conn_and_address_pair,
191                                    ConnectionState.connected)
192        return new_connection
193
194    def _make_active_connected_connection(self, connection_info: ConnectionInfo=None, name=None)->Connection:
195        conn = None
196        try:
197            conn = socket.socket(connection_info.socket_family, connection_info.socket_type,
198                                 connection_info.socket_protocol, connection_info.socket_fileno)
199            conn.setblocking(0)
200            conn.connect(connection_info.socket_address)
201        except (socket.error, OSError) as err:
202            if err.errno not in {errno.EINPROGRESS, errno.EAGAIN}:
203                conn.close()
204                raise err
205        conn_and_address_pair = (conn, connection_info.socket_address)
206        new_connection = Connection(self._get_new_connection_id(), connection_info, conn_and_address_pair,
207                                    ConnectionState.waiting_for_connection, name)
208        self.add_connection(new_connection)
209        self.method.set__need_write(new_connection.conn, True)
210        return new_connection
211
212    def _make_passive_connection(self, connection_info: ConnectionInfo=None, name=None)->Connection:
213        conn = None
214        try:
215            conn = socket.socket(connection_info.socket_family, connection_info.socket_type,
216                                 connection_info.socket_protocol, connection_info.socket_fileno)
217            conn.setblocking(0)
218            conn.bind(connection_info.socket_address)
219            conn.listen(connection_info.backlog)
220        except:
221            conn.close()
222            raise
223        conn_and_address_pair = (conn, connection_info.socket_address)
224        new_connection = Connection(self._get_new_connection_id(), connection_info, conn_and_address_pair,
225                                    ConnectionState.connected, name)
226        self.add_connection(new_connection)
227
228        try:
229            new_connection.worker_obj.on_connect()
230            self.check_is_connection_need_to_sent_data(new_connection)
231        except:
232            if __debug__: self.log_exception()
233            self._set_connection_to_be_closed(new_connection, ConnectionState.worker_fault)
234
235        return new_connection
236
237    def _remove_connection_from_internal_structures(self, connection: Connection):
238        if connection in self.all_connections:
239            self.all_connections.remove(connection)
240        if connection in self.passive_connections:
241            self.passive_connections.remove(connection)
242        if connection.connection_id in self.connection_by_id:
243            del self.connection_by_id[connection.connection_id]
244        if connection.connection_name is not None:
245            if connection.connection_name in self.connection_by_name:
246                del self.connection_by_name[connection.connection_name]
247        if connection.conn.fileno() in self.connection_by_fileno:
248            del self.connection_by_fileno[connection.conn.fileno()]
249        self.method.remove_connection(connection.conn)
250
251    def _set_connection_to_be_closed(self, connection: Connection, state: ConnectionState):
252        connection.connection_state = state
253        self.method.set__should_be_closed(connection.conn)
254
255    def check_is_connection_need_to_sent_data(self, connection: Connection):
256        if connection.must_be_written_data or connection.force_write_call:
257            if not isinstance(connection.must_be_written_data, memoryview):
258                connection.must_be_written_data = memoryview(connection.must_be_written_data)
259            self.method.set__need_write(connection.conn, True)
260        else:
261            self.method.set__need_write(connection.conn, False)
262
263    def log_exception(self):
264        if not self.need_to_print_exceptions_info:
265            return
266        exc = sys.exc_info()
267        exception = exc
268        error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0]))
269        formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2])
270        exception = exception[:2] + (formatted_traceback,)
271        trace_str = ''.join(exception[2])
272        result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str)
273        print(result_string)
class NetIO(cengal.io.net_io.versions.v_0.net_io_abstract.NetIOBase):
 40class NetIO(NetIOBase):
 41    def __init__(self, transport):
 42        super().__init__(transport)
 43
 44        self._need_to_stop = False
 45        self._already_begun = False
 46
 47        self._new_connection_id = 0
 48
 49        self.need_to_print_exceptions_info = False
 50
 51    def destroy(self):
 52        self.method.destroy()
 53
 54    def start(self, destroy_on_finish=True):
 55        if self._already_begun:
 56            raise LoopIsAlreadyBegun()
 57
 58        self._already_begun = True
 59        try:
 60            while not self._need_to_stop:
 61                self.method.loop_iteration()
 62        finally:
 63            self._already_begun = False
 64            if destroy_on_finish:
 65                self.destroy()
 66
 67    def stop(self):
 68        self._need_to_stop = True
 69
 70    def make_connection(self, connection_info: ConnectionInfo=None, name=None)->Connection:
 71        new_connection = None
 72        if ConnectionType.passive == connection_info.connection_type:
 73            new_connection = self._make_passive_connection(connection_info, name)
 74        elif ConnectionType.active_connected == connection_info.connection_type:
 75            new_connection = self._make_active_connected_connection(connection_info, name)
 76        else:
 77            raise WrongConnectionType()
 78        return new_connection
 79
 80    def add_connection(self, connection: Connection):
 81        self.all_connections.add(connection)
 82        if ConnectionType.passive == connection.connection_info.connection_type:
 83            self.passive_connections.add(connection)
 84        self.connection_by_id[connection.connection_id] = connection
 85        if connection.connection_name is not None:
 86            self.connection_by_name[connection.connection_name] = connection
 87        self.connection_by_fileno[connection.conn.fileno()] = connection
 88
 89        if connection.worker_obj.api is None:
 90            connection.worker_obj.api = self
 91        if connection.worker_obj.connection is None:
 92            connection.worker_obj.connection = connection
 93
 94        self.method.add_connection(connection.conn)
 95        self.check_is_connection_need_to_sent_data(connection)
 96
 97    def remove_connection(self, connection: Connection):
 98        connection.connection_state = ConnectionState.waiting_for_disconnection
 99        self.method.set__should_be_closed(connection.conn)
100
101    def on_accept_connection(self, connection):
102        new_conn = None
103        try:
104            conn_and_address_pair = connection.conn.accept()
105            new_conn, new_address = conn_and_address_pair
106            new_connection = self._construct_active_accepted_connection(connection, conn_and_address_pair)
107            self.add_connection(new_connection)
108            try:
109                new_connection.worker_obj.on_connect()
110                self.check_is_connection_need_to_sent_data(new_connection)
111            except:
112                if __debug__: self.log_exception()
113                self._set_connection_to_be_closed(new_connection, ConnectionState.worker_fault)
114        except BlockingIOError:
115            pass
116        except:
117            if __debug__: self.log_exception()
118            if new_conn is not None:
119                self.method.should_be_closed.add(new_conn)
120
121    def on_connected(self, connection: Connection):
122        connection.connection_state = ConnectionState.connected
123        try:
124            connection.worker_obj.on_connect()
125            self.check_is_connection_need_to_sent_data(connection)
126        except:
127            if __debug__: self.log_exception()
128            self._set_connection_to_be_closed(connection, ConnectionState.worker_fault)
129
130    def on_read(self, connection: Connection):
131        try:
132            another_read_data_part = connection.conn.recv(1024)
133            if another_read_data_part:
134                connection.read_data += another_read_data_part
135                try:
136                    connection.worker_obj.on_read()
137                    self.check_is_connection_need_to_sent_data(connection)
138                except:
139                    if __debug__: self.log_exception()
140                    self._set_connection_to_be_closed(connection, ConnectionState.worker_fault)
141            else:
142                self._set_connection_to_be_closed(connection, ConnectionState.io_fault)
143        except BlockingIOError:
144            pass
145        except:
146            if __debug__: self.log_exception()
147            self._set_connection_to_be_closed(connection, ConnectionState.io_fault)
148
149    def on_write(self, connection: Connection):
150        try:
151            if connection.must_be_written_data:
152                nsent = connection.conn.send(connection.must_be_written_data)
153                connection.must_be_written_data = connection.must_be_written_data[nsent:]
154            if not connection.must_be_written_data:
155                try:
156                    connection.worker_obj.on_no_more_data_to_write()
157                    self.check_is_connection_need_to_sent_data(connection)
158                except:
159                    if __debug__: self.log_exception()
160                    self._set_connection_to_be_closed(connection, ConnectionState.worker_fault)
161        except BlockingIOError:
162            pass
163        except:
164            if __debug__: self.log_exception()
165            self._set_connection_to_be_closed(connection, ConnectionState.io_fault)
166
167    def on_close(self, connection: Connection):
168        self._remove_connection_from_internal_structures(connection)
169        connection.conn.close()
170        connection.connection_state = ConnectionState.disconnected
171        try:
172            connection.worker_obj.on_connection_lost()
173        except:
174            if __debug__: self.log_exception()
175            pass
176
177    def _get_new_connection_id(self):
178        result = self._new_connection_id
179        self._new_connection_id += 1
180        return result
181
182    def _construct_active_accepted_connection(self, base_passive_connection: Connection,
183                                              conn_and_address_pair: tuple):
184        conn, address = conn_and_address_pair
185        new_worker_obj = copy.copy(base_passive_connection.worker_obj)
186        new_worker_obj.api = None
187        new_worker_obj.connection = None
188        new_connection_info = ConnectionInfo(new_worker_obj,
189                                             ConnectionType.active_accepted,
190                                             address, conn.family, conn.type, conn.proto)
191        new_connection = Connection(self._get_new_connection_id(), new_connection_info, conn_and_address_pair,
192                                    ConnectionState.connected)
193        return new_connection
194
195    def _make_active_connected_connection(self, connection_info: ConnectionInfo=None, name=None)->Connection:
196        conn = None
197        try:
198            conn = socket.socket(connection_info.socket_family, connection_info.socket_type,
199                                 connection_info.socket_protocol, connection_info.socket_fileno)
200            conn.setblocking(0)
201            conn.connect(connection_info.socket_address)
202        except (socket.error, OSError) as err:
203            if err.errno not in {errno.EINPROGRESS, errno.EAGAIN}:
204                conn.close()
205                raise err
206        conn_and_address_pair = (conn, connection_info.socket_address)
207        new_connection = Connection(self._get_new_connection_id(), connection_info, conn_and_address_pair,
208                                    ConnectionState.waiting_for_connection, name)
209        self.add_connection(new_connection)
210        self.method.set__need_write(new_connection.conn, True)
211        return new_connection
212
213    def _make_passive_connection(self, connection_info: ConnectionInfo=None, name=None)->Connection:
214        conn = None
215        try:
216            conn = socket.socket(connection_info.socket_family, connection_info.socket_type,
217                                 connection_info.socket_protocol, connection_info.socket_fileno)
218            conn.setblocking(0)
219            conn.bind(connection_info.socket_address)
220            conn.listen(connection_info.backlog)
221        except:
222            conn.close()
223            raise
224        conn_and_address_pair = (conn, connection_info.socket_address)
225        new_connection = Connection(self._get_new_connection_id(), connection_info, conn_and_address_pair,
226                                    ConnectionState.connected, name)
227        self.add_connection(new_connection)
228
229        try:
230            new_connection.worker_obj.on_connect()
231            self.check_is_connection_need_to_sent_data(new_connection)
232        except:
233            if __debug__: self.log_exception()
234            self._set_connection_to_be_closed(new_connection, ConnectionState.worker_fault)
235
236        return new_connection
237
238    def _remove_connection_from_internal_structures(self, connection: Connection):
239        if connection in self.all_connections:
240            self.all_connections.remove(connection)
241        if connection in self.passive_connections:
242            self.passive_connections.remove(connection)
243        if connection.connection_id in self.connection_by_id:
244            del self.connection_by_id[connection.connection_id]
245        if connection.connection_name is not None:
246            if connection.connection_name in self.connection_by_name:
247                del self.connection_by_name[connection.connection_name]
248        if connection.conn.fileno() in self.connection_by_fileno:
249            del self.connection_by_fileno[connection.conn.fileno()]
250        self.method.remove_connection(connection.conn)
251
252    def _set_connection_to_be_closed(self, connection: Connection, state: ConnectionState):
253        connection.connection_state = state
254        self.method.set__should_be_closed(connection.conn)
255
256    def check_is_connection_need_to_sent_data(self, connection: Connection):
257        if connection.must_be_written_data or connection.force_write_call:
258            if not isinstance(connection.must_be_written_data, memoryview):
259                connection.must_be_written_data = memoryview(connection.must_be_written_data)
260            self.method.set__need_write(connection.conn, True)
261        else:
262            self.method.set__need_write(connection.conn, False)
263
264    def log_exception(self):
265        if not self.need_to_print_exceptions_info:
266            return
267        exc = sys.exc_info()
268        exception = exc
269        error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0]))
270        formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2])
271        exception = exception[:2] + (formatted_traceback,)
272        trace_str = ''.join(exception[2])
273        result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str)
274        print(result_string)

Base class for any IO implementation (Linux, BSD, Windows, cross platform, etc.).

NetIO(transport)
41    def __init__(self, transport):
42        super().__init__(transport)
43
44        self._need_to_stop = False
45        self._already_begun = False
46
47        self._new_connection_id = 0
48
49        self.need_to_print_exceptions_info = False

:param transport: class of the desired IOMethod. Instance (object) will be created by NetIOBase itself

need_to_print_exceptions_info
def destroy(self):
51    def destroy(self):
52        self.method.destroy()
def start(self, destroy_on_finish=True):
54    def start(self, destroy_on_finish=True):
55        if self._already_begun:
56            raise LoopIsAlreadyBegun()
57
58        self._already_begun = True
59        try:
60            while not self._need_to_stop:
61                self.method.loop_iteration()
62        finally:
63            self._already_begun = False
64            if destroy_on_finish:
65                self.destroy()

Will start IO loop :param destroy_on_finish: if True - destroy() will be called from inside of this method :return:

def stop(self):
67    def stop(self):
68        self._need_to_stop = True

Will initiate IO loop stop process :return:

def make_connection( self, connection_info: cengal.io.net_io.versions.v_0.net_io_abstract.ConnectionInfo = None, name=None) -> cengal.io.net_io.versions.v_0.net_io_abstract.Connection:
70    def make_connection(self, connection_info: ConnectionInfo=None, name=None)->Connection:
71        new_connection = None
72        if ConnectionType.passive == connection_info.connection_type:
73            new_connection = self._make_passive_connection(connection_info, name)
74        elif ConnectionType.active_connected == connection_info.connection_type:
75            new_connection = self._make_active_connected_connection(connection_info, name)
76        else:
77            raise WrongConnectionType()
78        return new_connection

Will create connection from given connection_info object. Than connection will be established. Immediate or delayed - depends on the connection type:

  • ConnectionType.passive - immediate;
  • ConnectionType.active_connected - delayed. In both cases WorkerBase.on_connect will be called immediately after connection will be successfully established (IF it will be successfully established). :param connection_info: new connection will be created using information provided in connection_info object. See ConnectionInfo() for more information :param name: name of the connection. If you'll provide it - you will be able to find this connection in NetIOUserApi.connection_by_name dictionary by it's name. :return:
def add_connection( self, connection: cengal.io.net_io.versions.v_0.net_io_abstract.Connection):
80    def add_connection(self, connection: Connection):
81        self.all_connections.add(connection)
82        if ConnectionType.passive == connection.connection_info.connection_type:
83            self.passive_connections.add(connection)
84        self.connection_by_id[connection.connection_id] = connection
85        if connection.connection_name is not None:
86            self.connection_by_name[connection.connection_name] = connection
87        self.connection_by_fileno[connection.conn.fileno()] = connection
88
89        if connection.worker_obj.api is None:
90            connection.worker_obj.api = self
91        if connection.worker_obj.connection is None:
92            connection.worker_obj.connection = connection
93
94        self.method.add_connection(connection.conn)
95        self.check_is_connection_need_to_sent_data(connection)

Will register already established connection. You need to use this method for example if you have already connected socket :param connection: :return:

def remove_connection( self, connection: cengal.io.net_io.versions.v_0.net_io_abstract.Connection):
97    def remove_connection(self, connection: Connection):
98        connection.connection_state = ConnectionState.waiting_for_disconnection
99        self.method.set__should_be_closed(connection.conn)

Will close and remove connection :param connection: :return:

def on_accept_connection(self, connection):
101    def on_accept_connection(self, connection):
102        new_conn = None
103        try:
104            conn_and_address_pair = connection.conn.accept()
105            new_conn, new_address = conn_and_address_pair
106            new_connection = self._construct_active_accepted_connection(connection, conn_and_address_pair)
107            self.add_connection(new_connection)
108            try:
109                new_connection.worker_obj.on_connect()
110                self.check_is_connection_need_to_sent_data(new_connection)
111            except:
112                if __debug__: self.log_exception()
113                self._set_connection_to_be_closed(new_connection, ConnectionState.worker_fault)
114        except BlockingIOError:
115            pass
116        except:
117            if __debug__: self.log_exception()
118            if new_conn is not None:
119                self.method.should_be_closed.add(new_conn)
def on_connected( self, connection: cengal.io.net_io.versions.v_0.net_io_abstract.Connection):
121    def on_connected(self, connection: Connection):
122        connection.connection_state = ConnectionState.connected
123        try:
124            connection.worker_obj.on_connect()
125            self.check_is_connection_need_to_sent_data(connection)
126        except:
127            if __debug__: self.log_exception()
128            self._set_connection_to_be_closed(connection, ConnectionState.worker_fault)
def on_read( self, connection: cengal.io.net_io.versions.v_0.net_io_abstract.Connection):
130    def on_read(self, connection: Connection):
131        try:
132            another_read_data_part = connection.conn.recv(1024)
133            if another_read_data_part:
134                connection.read_data += another_read_data_part
135                try:
136                    connection.worker_obj.on_read()
137                    self.check_is_connection_need_to_sent_data(connection)
138                except:
139                    if __debug__: self.log_exception()
140                    self._set_connection_to_be_closed(connection, ConnectionState.worker_fault)
141            else:
142                self._set_connection_to_be_closed(connection, ConnectionState.io_fault)
143        except BlockingIOError:
144            pass
145        except:
146            if __debug__: self.log_exception()
147            self._set_connection_to_be_closed(connection, ConnectionState.io_fault)
def on_write( self, connection: cengal.io.net_io.versions.v_0.net_io_abstract.Connection):
149    def on_write(self, connection: Connection):
150        try:
151            if connection.must_be_written_data:
152                nsent = connection.conn.send(connection.must_be_written_data)
153                connection.must_be_written_data = connection.must_be_written_data[nsent:]
154            if not connection.must_be_written_data:
155                try:
156                    connection.worker_obj.on_no_more_data_to_write()
157                    self.check_is_connection_need_to_sent_data(connection)
158                except:
159                    if __debug__: self.log_exception()
160                    self._set_connection_to_be_closed(connection, ConnectionState.worker_fault)
161        except BlockingIOError:
162            pass
163        except:
164            if __debug__: self.log_exception()
165            self._set_connection_to_be_closed(connection, ConnectionState.io_fault)
def on_close( self, connection: cengal.io.net_io.versions.v_0.net_io_abstract.Connection):
167    def on_close(self, connection: Connection):
168        self._remove_connection_from_internal_structures(connection)
169        connection.conn.close()
170        connection.connection_state = ConnectionState.disconnected
171        try:
172            connection.worker_obj.on_connection_lost()
173        except:
174            if __debug__: self.log_exception()
175            pass
def check_is_connection_need_to_sent_data( self, connection: cengal.io.net_io.versions.v_0.net_io_abstract.Connection):
256    def check_is_connection_need_to_sent_data(self, connection: Connection):
257        if connection.must_be_written_data or connection.force_write_call:
258            if not isinstance(connection.must_be_written_data, memoryview):
259                connection.must_be_written_data = memoryview(connection.must_be_written_data)
260            self.method.set__need_write(connection.conn, True)
261        else:
262            self.method.set__need_write(connection.conn, False)

Will check connection to output data presence. It is automatically called after EACH WorkerBase callback call by the IO loop. But if you are filling other connection's output buffer - you'll need to make this call for that connection by your self. :param connection: :return:

def log_exception(self):
264    def log_exception(self):
265        if not self.need_to_print_exceptions_info:
266            return
267        exc = sys.exc_info()
268        exception = exc
269        error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0]))
270        formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2])
271        exception = exception[:2] + (formatted_traceback,)
272        trace_str = ''.join(exception[2])
273        result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str)
274        print(result_string)
Inherited Members
cengal.io.net_io.versions.v_0.net_io_abstract.NetIOBase
method
cengal.io.net_io.versions.v_0.net_io_abstract.NetIOUserApi
all_connections
passive_connections
connection_by_id
connection_by_name
connection_by_fileno