cengal.io.net_io.versions.v_0.net_io__linux
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18from .net_io_abstract import * 19import sys 20import traceback 21 22""" 23Module Docstring 24Docstrings: http://www.python.org/dev/peps/pep-0257/ 25""" 26 27__author__ = "ButenkoMS <gtalk@butenkoms.space>" 28__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 29__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 30__license__ = "Apache License, Version 2.0" 31__version__ = "4.4.1" 32__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 33__email__ = "gtalk@butenkoms.space" 34# __status__ = "Prototype" 35__status__ = "Development" 36# __status__ = "Production" 37 38 39class NetIO(NetIOBase): 40 def __init__(self, transport): 41 super().__init__(transport) 42 43 self._need_to_stop = False 44 self._already_begun = False 45 46 self._new_connection_id = 0 47 48 self.need_to_print_exceptions_info = False 49 50 def destroy(self): 51 self.method.destroy() 52 53 def start(self, destroy_on_finish=True): 54 if self._already_begun: 55 raise LoopIsAlreadyBegun() 56 57 self._already_begun = True 58 try: 59 while not self._need_to_stop: 60 self.method.loop_iteration() 61 finally: 62 self._already_begun = False 63 if destroy_on_finish: 64 self.destroy() 65 66 def stop(self): 67 self._need_to_stop = True 68 69 def make_connection(self, connection_info: ConnectionInfo=None, name=None)->Connection: 70 new_connection = None 71 if ConnectionType.passive == connection_info.connection_type: 72 new_connection = self._make_passive_connection(connection_info, name) 73 elif ConnectionType.active_connected == connection_info.connection_type: 74 new_connection = self._make_active_connected_connection(connection_info, name) 75 else: 76 raise WrongConnectionType() 77 return new_connection 78 79 def add_connection(self, connection: Connection): 80 self.all_connections.add(connection) 81 if ConnectionType.passive == connection.connection_info.connection_type: 82 self.passive_connections.add(connection) 83 self.connection_by_id[connection.connection_id] = connection 84 if connection.connection_name is not None: 85 self.connection_by_name[connection.connection_name] = connection 86 self.connection_by_fileno[connection.conn.fileno()] = connection 87 88 if connection.worker_obj.api is None: 89 connection.worker_obj.api = self 90 if connection.worker_obj.connection is None: 91 connection.worker_obj.connection = connection 92 93 self.method.add_connection(connection.conn) 94 self.check_is_connection_need_to_sent_data(connection) 95 96 def remove_connection(self, connection: Connection): 97 connection.connection_state = ConnectionState.waiting_for_disconnection 98 self.method.set__should_be_closed(connection.conn) 99 100 def on_accept_connection(self, connection): 101 new_conn = None 102 try: 103 conn_and_address_pair = connection.conn.accept() 104 new_conn, new_address = conn_and_address_pair 105 new_connection = self._construct_active_accepted_connection(connection, conn_and_address_pair) 106 self.add_connection(new_connection) 107 try: 108 new_connection.worker_obj.on_connect() 109 self.check_is_connection_need_to_sent_data(new_connection) 110 except: 111 if __debug__: self.log_exception() 112 self._set_connection_to_be_closed(new_connection, ConnectionState.worker_fault) 113 except BlockingIOError: 114 pass 115 except: 116 if __debug__: self.log_exception() 117 if new_conn is not None: 118 self.method.should_be_closed.add(new_conn) 119 120 def on_connected(self, connection: Connection): 121 connection.connection_state = ConnectionState.connected 122 try: 123 connection.worker_obj.on_connect() 124 self.check_is_connection_need_to_sent_data(connection) 125 except: 126 if __debug__: self.log_exception() 127 self._set_connection_to_be_closed(connection, ConnectionState.worker_fault) 128 129 def on_read(self, connection: Connection): 130 try: 131 another_read_data_part = connection.conn.recv(1024) 132 if another_read_data_part: 133 connection.read_data += another_read_data_part 134 try: 135 connection.worker_obj.on_read() 136 self.check_is_connection_need_to_sent_data(connection) 137 except: 138 if __debug__: self.log_exception() 139 self._set_connection_to_be_closed(connection, ConnectionState.worker_fault) 140 else: 141 self._set_connection_to_be_closed(connection, ConnectionState.io_fault) 142 except BlockingIOError: 143 pass 144 except: 145 if __debug__: self.log_exception() 146 self._set_connection_to_be_closed(connection, ConnectionState.io_fault) 147 148 def on_write(self, connection: Connection): 149 try: 150 if connection.must_be_written_data: 151 nsent = connection.conn.send(connection.must_be_written_data) 152 connection.must_be_written_data = connection.must_be_written_data[nsent:] 153 if not connection.must_be_written_data: 154 try: 155 connection.worker_obj.on_no_more_data_to_write() 156 self.check_is_connection_need_to_sent_data(connection) 157 except: 158 if __debug__: self.log_exception() 159 self._set_connection_to_be_closed(connection, ConnectionState.worker_fault) 160 except BlockingIOError: 161 pass 162 except: 163 if __debug__: self.log_exception() 164 self._set_connection_to_be_closed(connection, ConnectionState.io_fault) 165 166 def on_close(self, connection: Connection): 167 self._remove_connection_from_internal_structures(connection) 168 connection.conn.close() 169 connection.connection_state = ConnectionState.disconnected 170 try: 171 connection.worker_obj.on_connection_lost() 172 except: 173 if __debug__: self.log_exception() 174 pass 175 176 def _get_new_connection_id(self): 177 result = self._new_connection_id 178 self._new_connection_id += 1 179 return result 180 181 def _construct_active_accepted_connection(self, base_passive_connection: Connection, 182 conn_and_address_pair: tuple): 183 conn, address = conn_and_address_pair 184 new_worker_obj = copy.copy(base_passive_connection.worker_obj) 185 new_worker_obj.api = None 186 new_worker_obj.connection = None 187 new_connection_info = ConnectionInfo(new_worker_obj, 188 ConnectionType.active_accepted, 189 address, conn.family, conn.type, conn.proto) 190 new_connection = Connection(self._get_new_connection_id(), new_connection_info, conn_and_address_pair, 191 ConnectionState.connected) 192 return new_connection 193 194 def _make_active_connected_connection(self, connection_info: ConnectionInfo=None, name=None)->Connection: 195 conn = None 196 try: 197 conn = socket.socket(connection_info.socket_family, connection_info.socket_type, 198 connection_info.socket_protocol, connection_info.socket_fileno) 199 conn.setblocking(0) 200 conn.connect(connection_info.socket_address) 201 except (socket.error, OSError) as err: 202 if err.errno not in {errno.EINPROGRESS, errno.EAGAIN}: 203 conn.close() 204 raise err 205 conn_and_address_pair = (conn, connection_info.socket_address) 206 new_connection = Connection(self._get_new_connection_id(), connection_info, conn_and_address_pair, 207 ConnectionState.waiting_for_connection, name) 208 self.add_connection(new_connection) 209 self.method.set__need_write(new_connection.conn, True) 210 return new_connection 211 212 def _make_passive_connection(self, connection_info: ConnectionInfo=None, name=None)->Connection: 213 conn = None 214 try: 215 conn = socket.socket(connection_info.socket_family, connection_info.socket_type, 216 connection_info.socket_protocol, connection_info.socket_fileno) 217 conn.setblocking(0) 218 conn.bind(connection_info.socket_address) 219 conn.listen(connection_info.backlog) 220 except: 221 conn.close() 222 raise 223 conn_and_address_pair = (conn, connection_info.socket_address) 224 new_connection = Connection(self._get_new_connection_id(), connection_info, conn_and_address_pair, 225 ConnectionState.connected, name) 226 self.add_connection(new_connection) 227 228 try: 229 new_connection.worker_obj.on_connect() 230 self.check_is_connection_need_to_sent_data(new_connection) 231 except: 232 if __debug__: self.log_exception() 233 self._set_connection_to_be_closed(new_connection, ConnectionState.worker_fault) 234 235 return new_connection 236 237 def _remove_connection_from_internal_structures(self, connection: Connection): 238 if connection in self.all_connections: 239 self.all_connections.remove(connection) 240 if connection in self.passive_connections: 241 self.passive_connections.remove(connection) 242 if connection.connection_id in self.connection_by_id: 243 del self.connection_by_id[connection.connection_id] 244 if connection.connection_name is not None: 245 if connection.connection_name in self.connection_by_name: 246 del self.connection_by_name[connection.connection_name] 247 if connection.conn.fileno() in self.connection_by_fileno: 248 del self.connection_by_fileno[connection.conn.fileno()] 249 self.method.remove_connection(connection.conn) 250 251 def _set_connection_to_be_closed(self, connection: Connection, state: ConnectionState): 252 connection.connection_state = state 253 self.method.set__should_be_closed(connection.conn) 254 255 def check_is_connection_need_to_sent_data(self, connection: Connection): 256 if connection.must_be_written_data or connection.force_write_call: 257 if not isinstance(connection.must_be_written_data, memoryview): 258 connection.must_be_written_data = memoryview(connection.must_be_written_data) 259 self.method.set__need_write(connection.conn, True) 260 else: 261 self.method.set__need_write(connection.conn, False) 262 263 def log_exception(self): 264 if not self.need_to_print_exceptions_info: 265 return 266 exc = sys.exc_info() 267 exception = exc 268 error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0])) 269 formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2]) 270 exception = exception[:2] + (formatted_traceback,) 271 trace_str = ''.join(exception[2]) 272 result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str) 273 print(result_string)
40class NetIO(NetIOBase): 41 def __init__(self, transport): 42 super().__init__(transport) 43 44 self._need_to_stop = False 45 self._already_begun = False 46 47 self._new_connection_id = 0 48 49 self.need_to_print_exceptions_info = False 50 51 def destroy(self): 52 self.method.destroy() 53 54 def start(self, destroy_on_finish=True): 55 if self._already_begun: 56 raise LoopIsAlreadyBegun() 57 58 self._already_begun = True 59 try: 60 while not self._need_to_stop: 61 self.method.loop_iteration() 62 finally: 63 self._already_begun = False 64 if destroy_on_finish: 65 self.destroy() 66 67 def stop(self): 68 self._need_to_stop = True 69 70 def make_connection(self, connection_info: ConnectionInfo=None, name=None)->Connection: 71 new_connection = None 72 if ConnectionType.passive == connection_info.connection_type: 73 new_connection = self._make_passive_connection(connection_info, name) 74 elif ConnectionType.active_connected == connection_info.connection_type: 75 new_connection = self._make_active_connected_connection(connection_info, name) 76 else: 77 raise WrongConnectionType() 78 return new_connection 79 80 def add_connection(self, connection: Connection): 81 self.all_connections.add(connection) 82 if ConnectionType.passive == connection.connection_info.connection_type: 83 self.passive_connections.add(connection) 84 self.connection_by_id[connection.connection_id] = connection 85 if connection.connection_name is not None: 86 self.connection_by_name[connection.connection_name] = connection 87 self.connection_by_fileno[connection.conn.fileno()] = connection 88 89 if connection.worker_obj.api is None: 90 connection.worker_obj.api = self 91 if connection.worker_obj.connection is None: 92 connection.worker_obj.connection = connection 93 94 self.method.add_connection(connection.conn) 95 self.check_is_connection_need_to_sent_data(connection) 96 97 def remove_connection(self, connection: Connection): 98 connection.connection_state = ConnectionState.waiting_for_disconnection 99 self.method.set__should_be_closed(connection.conn) 100 101 def on_accept_connection(self, connection): 102 new_conn = None 103 try: 104 conn_and_address_pair = connection.conn.accept() 105 new_conn, new_address = conn_and_address_pair 106 new_connection = self._construct_active_accepted_connection(connection, conn_and_address_pair) 107 self.add_connection(new_connection) 108 try: 109 new_connection.worker_obj.on_connect() 110 self.check_is_connection_need_to_sent_data(new_connection) 111 except: 112 if __debug__: self.log_exception() 113 self._set_connection_to_be_closed(new_connection, ConnectionState.worker_fault) 114 except BlockingIOError: 115 pass 116 except: 117 if __debug__: self.log_exception() 118 if new_conn is not None: 119 self.method.should_be_closed.add(new_conn) 120 121 def on_connected(self, connection: Connection): 122 connection.connection_state = ConnectionState.connected 123 try: 124 connection.worker_obj.on_connect() 125 self.check_is_connection_need_to_sent_data(connection) 126 except: 127 if __debug__: self.log_exception() 128 self._set_connection_to_be_closed(connection, ConnectionState.worker_fault) 129 130 def on_read(self, connection: Connection): 131 try: 132 another_read_data_part = connection.conn.recv(1024) 133 if another_read_data_part: 134 connection.read_data += another_read_data_part 135 try: 136 connection.worker_obj.on_read() 137 self.check_is_connection_need_to_sent_data(connection) 138 except: 139 if __debug__: self.log_exception() 140 self._set_connection_to_be_closed(connection, ConnectionState.worker_fault) 141 else: 142 self._set_connection_to_be_closed(connection, ConnectionState.io_fault) 143 except BlockingIOError: 144 pass 145 except: 146 if __debug__: self.log_exception() 147 self._set_connection_to_be_closed(connection, ConnectionState.io_fault) 148 149 def on_write(self, connection: Connection): 150 try: 151 if connection.must_be_written_data: 152 nsent = connection.conn.send(connection.must_be_written_data) 153 connection.must_be_written_data = connection.must_be_written_data[nsent:] 154 if not connection.must_be_written_data: 155 try: 156 connection.worker_obj.on_no_more_data_to_write() 157 self.check_is_connection_need_to_sent_data(connection) 158 except: 159 if __debug__: self.log_exception() 160 self._set_connection_to_be_closed(connection, ConnectionState.worker_fault) 161 except BlockingIOError: 162 pass 163 except: 164 if __debug__: self.log_exception() 165 self._set_connection_to_be_closed(connection, ConnectionState.io_fault) 166 167 def on_close(self, connection: Connection): 168 self._remove_connection_from_internal_structures(connection) 169 connection.conn.close() 170 connection.connection_state = ConnectionState.disconnected 171 try: 172 connection.worker_obj.on_connection_lost() 173 except: 174 if __debug__: self.log_exception() 175 pass 176 177 def _get_new_connection_id(self): 178 result = self._new_connection_id 179 self._new_connection_id += 1 180 return result 181 182 def _construct_active_accepted_connection(self, base_passive_connection: Connection, 183 conn_and_address_pair: tuple): 184 conn, address = conn_and_address_pair 185 new_worker_obj = copy.copy(base_passive_connection.worker_obj) 186 new_worker_obj.api = None 187 new_worker_obj.connection = None 188 new_connection_info = ConnectionInfo(new_worker_obj, 189 ConnectionType.active_accepted, 190 address, conn.family, conn.type, conn.proto) 191 new_connection = Connection(self._get_new_connection_id(), new_connection_info, conn_and_address_pair, 192 ConnectionState.connected) 193 return new_connection 194 195 def _make_active_connected_connection(self, connection_info: ConnectionInfo=None, name=None)->Connection: 196 conn = None 197 try: 198 conn = socket.socket(connection_info.socket_family, connection_info.socket_type, 199 connection_info.socket_protocol, connection_info.socket_fileno) 200 conn.setblocking(0) 201 conn.connect(connection_info.socket_address) 202 except (socket.error, OSError) as err: 203 if err.errno not in {errno.EINPROGRESS, errno.EAGAIN}: 204 conn.close() 205 raise err 206 conn_and_address_pair = (conn, connection_info.socket_address) 207 new_connection = Connection(self._get_new_connection_id(), connection_info, conn_and_address_pair, 208 ConnectionState.waiting_for_connection, name) 209 self.add_connection(new_connection) 210 self.method.set__need_write(new_connection.conn, True) 211 return new_connection 212 213 def _make_passive_connection(self, connection_info: ConnectionInfo=None, name=None)->Connection: 214 conn = None 215 try: 216 conn = socket.socket(connection_info.socket_family, connection_info.socket_type, 217 connection_info.socket_protocol, connection_info.socket_fileno) 218 conn.setblocking(0) 219 conn.bind(connection_info.socket_address) 220 conn.listen(connection_info.backlog) 221 except: 222 conn.close() 223 raise 224 conn_and_address_pair = (conn, connection_info.socket_address) 225 new_connection = Connection(self._get_new_connection_id(), connection_info, conn_and_address_pair, 226 ConnectionState.connected, name) 227 self.add_connection(new_connection) 228 229 try: 230 new_connection.worker_obj.on_connect() 231 self.check_is_connection_need_to_sent_data(new_connection) 232 except: 233 if __debug__: self.log_exception() 234 self._set_connection_to_be_closed(new_connection, ConnectionState.worker_fault) 235 236 return new_connection 237 238 def _remove_connection_from_internal_structures(self, connection: Connection): 239 if connection in self.all_connections: 240 self.all_connections.remove(connection) 241 if connection in self.passive_connections: 242 self.passive_connections.remove(connection) 243 if connection.connection_id in self.connection_by_id: 244 del self.connection_by_id[connection.connection_id] 245 if connection.connection_name is not None: 246 if connection.connection_name in self.connection_by_name: 247 del self.connection_by_name[connection.connection_name] 248 if connection.conn.fileno() in self.connection_by_fileno: 249 del self.connection_by_fileno[connection.conn.fileno()] 250 self.method.remove_connection(connection.conn) 251 252 def _set_connection_to_be_closed(self, connection: Connection, state: ConnectionState): 253 connection.connection_state = state 254 self.method.set__should_be_closed(connection.conn) 255 256 def check_is_connection_need_to_sent_data(self, connection: Connection): 257 if connection.must_be_written_data or connection.force_write_call: 258 if not isinstance(connection.must_be_written_data, memoryview): 259 connection.must_be_written_data = memoryview(connection.must_be_written_data) 260 self.method.set__need_write(connection.conn, True) 261 else: 262 self.method.set__need_write(connection.conn, False) 263 264 def log_exception(self): 265 if not self.need_to_print_exceptions_info: 266 return 267 exc = sys.exc_info() 268 exception = exc 269 error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0])) 270 formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2]) 271 exception = exception[:2] + (formatted_traceback,) 272 trace_str = ''.join(exception[2]) 273 result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str) 274 print(result_string)
Base class for any IO implementation (Linux, BSD, Windows, cross platform, etc.).
41 def __init__(self, transport): 42 super().__init__(transport) 43 44 self._need_to_stop = False 45 self._already_begun = False 46 47 self._new_connection_id = 0 48 49 self.need_to_print_exceptions_info = False
:param transport: class of the desired IOMethod. Instance (object) will be created by NetIOBase itself
54 def start(self, destroy_on_finish=True): 55 if self._already_begun: 56 raise LoopIsAlreadyBegun() 57 58 self._already_begun = True 59 try: 60 while not self._need_to_stop: 61 self.method.loop_iteration() 62 finally: 63 self._already_begun = False 64 if destroy_on_finish: 65 self.destroy()
Will start IO loop :param destroy_on_finish: if True - destroy() will be called from inside of this method :return:
70 def make_connection(self, connection_info: ConnectionInfo=None, name=None)->Connection: 71 new_connection = None 72 if ConnectionType.passive == connection_info.connection_type: 73 new_connection = self._make_passive_connection(connection_info, name) 74 elif ConnectionType.active_connected == connection_info.connection_type: 75 new_connection = self._make_active_connected_connection(connection_info, name) 76 else: 77 raise WrongConnectionType() 78 return new_connection
Will create connection from given connection_info object. Than connection will be established. Immediate or delayed - depends on the connection type:
- ConnectionType.passive - immediate;
- ConnectionType.active_connected - delayed. In both cases WorkerBase.on_connect will be called immediately after connection will be successfully established (IF it will be successfully established). :param connection_info: new connection will be created using information provided in connection_info object. See ConnectionInfo() for more information :param name: name of the connection. If you'll provide it - you will be able to find this connection in NetIOUserApi.connection_by_name dictionary by it's name. :return:
80 def add_connection(self, connection: Connection): 81 self.all_connections.add(connection) 82 if ConnectionType.passive == connection.connection_info.connection_type: 83 self.passive_connections.add(connection) 84 self.connection_by_id[connection.connection_id] = connection 85 if connection.connection_name is not None: 86 self.connection_by_name[connection.connection_name] = connection 87 self.connection_by_fileno[connection.conn.fileno()] = connection 88 89 if connection.worker_obj.api is None: 90 connection.worker_obj.api = self 91 if connection.worker_obj.connection is None: 92 connection.worker_obj.connection = connection 93 94 self.method.add_connection(connection.conn) 95 self.check_is_connection_need_to_sent_data(connection)
Will register already established connection. You need to use this method for example if you have already connected socket :param connection: :return:
97 def remove_connection(self, connection: Connection): 98 connection.connection_state = ConnectionState.waiting_for_disconnection 99 self.method.set__should_be_closed(connection.conn)
Will close and remove connection :param connection: :return:
101 def on_accept_connection(self, connection): 102 new_conn = None 103 try: 104 conn_and_address_pair = connection.conn.accept() 105 new_conn, new_address = conn_and_address_pair 106 new_connection = self._construct_active_accepted_connection(connection, conn_and_address_pair) 107 self.add_connection(new_connection) 108 try: 109 new_connection.worker_obj.on_connect() 110 self.check_is_connection_need_to_sent_data(new_connection) 111 except: 112 if __debug__: self.log_exception() 113 self._set_connection_to_be_closed(new_connection, ConnectionState.worker_fault) 114 except BlockingIOError: 115 pass 116 except: 117 if __debug__: self.log_exception() 118 if new_conn is not None: 119 self.method.should_be_closed.add(new_conn)
121 def on_connected(self, connection: Connection): 122 connection.connection_state = ConnectionState.connected 123 try: 124 connection.worker_obj.on_connect() 125 self.check_is_connection_need_to_sent_data(connection) 126 except: 127 if __debug__: self.log_exception() 128 self._set_connection_to_be_closed(connection, ConnectionState.worker_fault)
130 def on_read(self, connection: Connection): 131 try: 132 another_read_data_part = connection.conn.recv(1024) 133 if another_read_data_part: 134 connection.read_data += another_read_data_part 135 try: 136 connection.worker_obj.on_read() 137 self.check_is_connection_need_to_sent_data(connection) 138 except: 139 if __debug__: self.log_exception() 140 self._set_connection_to_be_closed(connection, ConnectionState.worker_fault) 141 else: 142 self._set_connection_to_be_closed(connection, ConnectionState.io_fault) 143 except BlockingIOError: 144 pass 145 except: 146 if __debug__: self.log_exception() 147 self._set_connection_to_be_closed(connection, ConnectionState.io_fault)
149 def on_write(self, connection: Connection): 150 try: 151 if connection.must_be_written_data: 152 nsent = connection.conn.send(connection.must_be_written_data) 153 connection.must_be_written_data = connection.must_be_written_data[nsent:] 154 if not connection.must_be_written_data: 155 try: 156 connection.worker_obj.on_no_more_data_to_write() 157 self.check_is_connection_need_to_sent_data(connection) 158 except: 159 if __debug__: self.log_exception() 160 self._set_connection_to_be_closed(connection, ConnectionState.worker_fault) 161 except BlockingIOError: 162 pass 163 except: 164 if __debug__: self.log_exception() 165 self._set_connection_to_be_closed(connection, ConnectionState.io_fault)
167 def on_close(self, connection: Connection): 168 self._remove_connection_from_internal_structures(connection) 169 connection.conn.close() 170 connection.connection_state = ConnectionState.disconnected 171 try: 172 connection.worker_obj.on_connection_lost() 173 except: 174 if __debug__: self.log_exception() 175 pass
256 def check_is_connection_need_to_sent_data(self, connection: Connection): 257 if connection.must_be_written_data or connection.force_write_call: 258 if not isinstance(connection.must_be_written_data, memoryview): 259 connection.must_be_written_data = memoryview(connection.must_be_written_data) 260 self.method.set__need_write(connection.conn, True) 261 else: 262 self.method.set__need_write(connection.conn, False)
Will check connection to output data presence. It is automatically called after EACH WorkerBase callback call by the IO loop. But if you are filling other connection's output buffer - you'll need to make this call for that connection by your self. :param connection: :return:
264 def log_exception(self): 265 if not self.need_to_print_exceptions_info: 266 return 267 exc = sys.exc_info() 268 exception = exc 269 error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0])) 270 formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2]) 271 exception = exception[:2] + (formatted_traceback,) 272 trace_str = ''.join(exception[2]) 273 result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str) 274 print(result_string)
Inherited Members
- cengal.io.net_io.versions.v_0.net_io_abstract.NetIOBase
- method
- cengal.io.net_io.versions.v_0.net_io_abstract.NetIOUserApi
- all_connections
- passive_connections
- connection_by_id
- connection_by_name
- connection_by_fileno