cengal.io.asock_io.versions.v_1.asock_io_core
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18import copy 19import os 20import select 21import sys 22import traceback 23from collections import deque 24from contextlib import contextmanager 25# from fileinput import input 26from typing import Iterable, Set 27 28from cengal.code_flow_control.smart_values.versions.v_0 import ResultExistence 29from cengal.code_inspection.line_profiling import set_profiler 30from cengal.data_containers.fast_fifo import FIFOIsEmpty 31from cengal.data_generation.id_generator import GeneratorType, IDGenerator 32from cengal.data_manipulation.front_triggerable_variable import ( 33 FrontTriggerableVariable, FrontTriggerableVariableType) 34 35from .abstract import * 36from .base import * 37 38""" 39CAUTION: some code here is optimized for speed - not for readability or beauty. 40""" 41 42__author__ = "ButenkoMS <gtalk@butenkoms.space>" 43__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 44__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 45__license__ = "Apache License, Version 2.0" 46__version__ = "4.4.1" 47__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 48__email__ = "gtalk@butenkoms.space" 49# __status__ = "Prototype" 50__status__ = "Development" 51# __status__ = "Production" 52 53 54# set_profiler(True) 55set_profiler(False) 56 57 58class IoIterationResult: 59 """ 60 ([1] подключившиеся ожидаемые клиенты (ОКл); [2] ОКл сокет которых был отключен по причине ошибки 61 (сами ОКл еще небыли удалены - удаление нужно инициировать явно); [3] ОКл имеет очередь непрочитанных принятых 62 сообщений; [4] размер очереди неотправленных сообщений ОКл меньше порогового, а значит в нее можно записывать 63 новые запросы (не уверен пока в надобности этого параметра. Скорее всего он не нужен: актор в большинстве 64 случаев блокируется при вызове IO-операции; кроме случаев когда был задействован ассинхронный интерфейс, 65 при котором актор отправляет запрос не требуя ответа об успешном окончании операции (без какого-либо контроля 66 успешности, или же с ручным контролем путем вызова спец-метода, который-бы и проводил проверку, или же 67 считывание имеющихся результатов операций))) 68 """ 69 70 def __init__(self): 71 self.newly_connected_expected_clients = set() 72 self.newly_connected_unknown_clients = set() 73 self.clients_with_disconnected_connection = set() 74 self.clients_have_data_to_read = set() 75 self.clients_with_empty_output_fifo = set() 76 77 78class ASockIOCore(NetIOCallbacks, ASockIOCoreMemoryManagement): 79 def __init__(self, gates_connections_settings: Set[ConnectionSettings]): 80 """ 81 Port should not be open to a external world! 82 :param gates_connections_settings: set() of ConnectionSettings() 83 :return: 84 """ 85 super(ASockIOCore, self).__init__() 86 87 self.gates_connections_settings = gates_connections_settings 88 if not self.gates_connections_settings: 89 self.gates_connections_settings = set() 90 # raise Exception('gates_connections_settings should be provided!') 91 for gates_connections_settings in self.gates_connections_settings: 92 gates_connections_settings.check() 93 self.faulty_connection_settings = set() 94 self._connection_settings_by_gate_conn = dict() 95 96 self.set_of_gate_addresses = set() 97 self._gate = set() 98 self.reuse_gate_addr = False 99 self.reuse_gate_port = False 100 101 self.message_size_len = MESSAGE_SIZE_LEN 102 self.server_answer__keyword_accepted = SERVER_ANSWER__KEYWORD_ACCEPTED 103 # self.largest_small_output_data_block_size = 1024 * 4 - self.message_size_len 104 105 self._connections = dict() # key: ID; data: Connection() 106 self._connection_by_conn = dict() # key: conn; data: ID 107 self._connections_id_gen = IDGenerator() 108 109 self._connections_marked_as_ready_to_be_closed = set() 110 self._connections_marked_to_be_closed_immediately = set() 111 self._connections_marked_as_ready_to_be_deleted = set() 112 113 self._unconfirmed_clients = set() 114 115 self._we_have_connections_for_select = False 116 self._input_check_sockets = set() 117 self._output_check_sockets = set() 118 self._exception_check_sockets = set() 119 120 # ID (GUID) и другая информация клиентов, подключение которых явно ожидается 121 self._expected_clients = dict() # key: ID; data: Client() 122 self._expected_clients_id_gen = IDGenerator() 123 self._keywords_for_expected_clients = dict() # key: keyword; data: info 124 self._conns_of_expected_clients = dict() # key: conn; data expected_client_ID 125 126 self.unexpected_clients_are_allowed = True 127 128 # Список неопознанных и неожидаемых клиентов. Если клиент выдал свой GUID и позже кто-то добавил этот GUID в 129 # список ожидаемых клиентов - данный клиент будет автоматически подхвачен. 130 self._unexpected_clients = dict() # key: ID; data: Client() 131 self._unexpected_clients_id_gen = IDGenerator() 132 self._keywords_of_unexpected_clients = dict() 133 self._conns_of_unexpected_clients = dict() 134 135 self._io_iteration_result = IoIterationResult() 136 137 self.raw_checker_for_new_incoming_connections = CheckIsRawConnection() 138 self.need_to_auto_check_incoming_raw_connection = False 139 self.unknown_clients_are_allowed = False 140 self._unknown_clients_keyword_gen = IDGenerator(GeneratorType.guid_string) 141 self.prefix_for_unknown_client_keywords = b'UNKNOWN CLIENT: ' 142 143 self.echo_log = False 144 self._internal_log = deque() 145 146 self.recv_sizes = deque() 147 self.recv_buff_sizes = deque() 148 149 self.should_get_client_addr_info_on_connection = True 150 151 self.use_nodelay_inet = False 152 self.use_speed_optimized_socket_read = False 153 154 self.show_inform_about_accept_stop_because_of_all_buffers_size_limit = \ 155 FrontTriggerableVariable(FrontTriggerableVariableType.equal, True) 156 self.show_inform_about_read_stop_because_of_in_buffer_size_limit = \ 157 FrontTriggerableVariable(FrontTriggerableVariableType.equal, True) 158 self.show_inform_about_work_stop_because_of_out_buffer_size_limit = \ 159 FrontTriggerableVariable(FrontTriggerableVariableType.equal, True) 160 161 self.class_for_unknown_clients_inline_processing = None 162 163 self._clients_with_inline_processors_that_need_to_apply_parameters = set() 164 165 # @profile 166 def io_iteration(self, timeout=0.0): 167 """ 168 169 :param timeout: timeout in seconds 170 :return: 171 """ 172 result = self._io_iteration_result 173 174 if self._we_have_connections_for_select: 175 need_to_repeat = True 176 177 while need_to_repeat: 178 readable, writable, exceptional = select.select(self._input_check_sockets, 179 self._output_check_sockets, 180 self._exception_check_sockets, 181 timeout) 182 read_is_forbidden = True 183 if (self.global_in__data_full_size.result - self.global_in__deletable_data_full_size.result) \ 184 <= self.global_in__data_size_limit.result: 185 read_is_forbidden = False 186 187 # Handle inputs 188 for s in readable: 189 read_result = self._read_data_from_socket(s) 190 if read_result: 191 # read_result = self._read_messages_from_raw_input_into_fifo(s) 192 # if read_result: 193 if s in self._unconfirmed_clients: 194 self._process_client_keyword(s) 195 self._check_is_client_have_data_to_read_in_fifo(s) 196 else: 197 self._client_have_data_to_read_in_fifo(s) 198 199 if __debug__: 200 read_is_forbidden_test = \ 201 self.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger( 202 read_is_forbidden) 203 if read_is_forbidden_test is not None: 204 if read_is_forbidden_test: 205 print('Read is suppressed until data will be processed.') 206 else: 207 print('Read is allowed: data is processed.') 208 209 # Handle outputs 210 for s in writable: 211 curr_client_info = self._connections[self._connection_by_conn[s]] 212 self._write_data_to_socket(curr_client_info) 213 # self._write_data_to_socket(s) 214 215 # Handle "exceptional conditions" 216 for s in exceptional: 217 self._handle_connection_error(s) 218 219 # Set parameters for inline processors 220 if self._clients_with_inline_processors_that_need_to_apply_parameters: 221 for ec_id in self._clients_with_inline_processors_that_need_to_apply_parameters: 222 expected_client_info = self._expected_clients[ec_id] 223 connection_info = expected_client_info._Client__connection 224 # connection_info = self._connections.get(expected_client_info.connection_id) 225 self._inline_processor__apply_parameters(connection_info, expected_client_info) 226 self._clients_with_inline_processors_that_need_to_apply_parameters.clear() 227 228 # Close sockets 229 if self._connections_marked_to_be_closed_immediately: 230 sockets_should_be_closed_immediately = self._connections_marked_to_be_closed_immediately 231 self._connections_marked_to_be_closed_immediately = set() 232 for closeable_socket in sockets_should_be_closed_immediately: 233 connection_id = self._connection_by_conn[closeable_socket] 234 # self.close_connection_by_conn(closeable_socket) 235 self.close_connection(connection_id) 236 self._inline_processor__on__connection_lost_by_connection_id(connection_id) 237 238 # Removing clients 239 if self._connections_marked_as_ready_to_be_deleted: 240 clients_ready_to_be_deleted = self._connections_marked_as_ready_to_be_deleted 241 self._connections_marked_as_ready_to_be_deleted = set() 242 for faulty_socket in clients_ready_to_be_deleted: 243 self.remove_connection_by_conn(faulty_socket) 244 245 if (self.global_out__data_full_size.result - self.global_out__deletable_data_full_size.result) \ 246 <= self.global_out__data_size_limit.result: 247 need_to_repeat = False 248 else: 249 need_to_repeat = True 250 251 need_to_repeat = False 252 253 if __debug__: 254 need_to_repeat_show = self.show_inform_about_work_stop_because_of_out_buffer_size_limit.test_trigger( 255 need_to_repeat) 256 if need_to_repeat_show is not None: 257 if need_to_repeat_show: 258 print('Work is suppressed until data will be out.') 259 else: 260 print('Work is allowed: data is out.') 261 262 self._io_iteration_result = IoIterationResult() 263 return result 264 265 def listen(self, backlog=1): 266 # backlog = backlog or 1 267 268 new_connection_settings = set() 269 for gate_connection_settings in self.gates_connections_settings: 270 gate = self.add_gate(gate_connection_settings) 271 if gate: 272 new_connection_settings.add(gate) 273 # gate = None 274 # try: 275 # try: 276 # gate = socket.socket(gate_connection_settings.socket_family, 277 # gate_connection_settings.socket_type, 278 # gate_connection_settings.socket_protocol, 279 # gate_connection_settings.socket_fileno) 280 # gate.setblocking(0) 281 # # gate.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 282 # except (socket.error, OSError) as err: 283 # gate = None 284 # if __debug__: self._log('EXCEPTION: GATE: LISTEN: CREATE SOCKET: {}, {}'.format( 285 # err.errno, err.strerror)) 286 # raise InternalNotCriticalError() 287 # 288 # if self.reuse_gate_port: 289 # gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 290 # 291 # try: 292 # self._check_for_initial_af_unix_socket_unlink(gate_connection_settings) 293 # gate.bind(gate_connection_settings.socket_address) 294 # except (socket.error, OSError) as err: 295 # gate.close() 296 # gate = None 297 # if __debug__: self._log('EXCEPTION: GATE: BIND:"{}", {}, {}'.format( 298 # gate_connection_settings.socket_address, err.errno, err.strerror)) 299 # raise InternalNotCriticalError() 300 # try: 301 # gate.listen(backlog) 302 # except (socket.error, OSError) as err: 303 # gate.close() 304 # gate = None 305 # if __debug__: self._log('EXCEPTION: GATE: LISTEN:"{}", {}, {}'.format( 306 # gate_connection_settings.socket_address, err.errno, err.strerror)) 307 # raise InternalNotCriticalError() 308 # 309 # if self.reuse_gate_addr: 310 # gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 311 # 312 # self._input_check_sockets.add(gate) 313 # self._exception_check_sockets.add(gate) 314 # except InternalNotCriticalError as err: 315 # gate = None 316 # finally: 317 # if gate: 318 # self._gate.add(gate) 319 # if gate_connection_settings.socket_family in INET_TYPE_CONNECTIONS: 320 # self.set_of_gate_addresses.add(gate_connection_settings.socket_address[0]) 321 # elif socket.AF_UNIX == gate_connection_settings.socket_family: 322 # self.set_of_gate_addresses.add(gate_connection_settings.socket_address) 323 # else: 324 # self.set_of_gate_addresses.add(gate_connection_settings.socket_address) 325 # self._log('WARNING: GATE: SAVE CONNECTION ADDRESS: UNKNOWN SOCKET FAMILY') 326 # self._connection_settings_by_gate_conn[gate] = gate_connection_settings 327 # self._we_have_connections_for_select = True 328 # new_connection_settings.add(gate_connection_settings) 329 # else: 330 # self.faulty_connection_settings.add(gate_connection_settings) 331 self.gates_connections_settings = new_connection_settings 332 333 return len(self.gates_connections_settings) 334 335 def add_gate(self, gate_connection_settings: ConnectionSettings): 336 gate_connection_settings.check() 337 338 gate = None 339 try: 340 try: 341 gate = socket.socket(gate_connection_settings.socket_family, gate_connection_settings.socket_type, 342 gate_connection_settings.socket_protocol, gate_connection_settings.socket_fileno) 343 gate.setblocking(0) 344 # gate.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 345 except (socket.error, OSError) as err: 346 gate = None 347 if __debug__: 348 self._log('EXCEPTION: GATE: LISTEN: CREATE SOCKET: {}, {}'.format( 349 err.errno, err.strerror)) 350 raise InternalNotCriticalError() 351 352 if self.reuse_gate_port: 353 gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 354 355 try: 356 self._check_for_initial_af_unix_socket_unlink(gate_connection_settings) 357 gate.bind(gate_connection_settings.socket_address) 358 except (socket.error, OSError) as err: 359 gate.close() 360 gate = None 361 if __debug__: 362 self._log('EXCEPTION: GATE: BIND:"{}", {}, {}'.format( 363 gate_connection_settings.socket_address, err.errno, err.strerror)) 364 raise InternalNotCriticalError() 365 try: 366 gate.listen(gate_connection_settings.backlog) 367 except (socket.error, OSError) as err: 368 gate.close() 369 gate = None 370 if __debug__: 371 self._log('EXCEPTION: GATE: LISTEN:"{}", {}, {}'.format( 372 gate_connection_settings.socket_address, err.errno, err.strerror)) 373 raise InternalNotCriticalError() 374 375 if self.reuse_gate_addr: 376 gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 377 378 self._input_check_sockets.add(gate) 379 self._exception_check_sockets.add(gate) 380 except InternalNotCriticalError as err: 381 gate = None 382 finally: 383 if gate: 384 self._gate.add(gate) 385 if gate_connection_settings.socket_family in INET_TYPE_CONNECTIONS: 386 self.set_of_gate_addresses.add(gate_connection_settings.socket_address[0]) 387 elif socket.AF_UNIX == gate_connection_settings.socket_family: 388 self.set_of_gate_addresses.add(gate_connection_settings.socket_address) 389 else: 390 self.set_of_gate_addresses.add(gate_connection_settings.socket_address) 391 self._log('WARNING: GATE: SAVE CONNECTION ADDRESS: UNKNOWN SOCKET FAMILY') 392 self._connection_settings_by_gate_conn[gate] = gate_connection_settings 393 self._we_have_connections_for_select = True 394 return gate_connection_settings 395 else: 396 self.faulty_connection_settings.add(gate_connection_settings) 397 return None 398 399 def close_all_connections(self): 400 if __debug__: 401 self._log('CLOSE ALL CONNECTIONS:') 402 clients_list = dict(self._connections) 403 for connection_id, client_info in clients_list.items(): 404 self.close_connection(connection_id) 405 406 def remove_all_connections(self): 407 clients_list = dict(self._connections) 408 for connection_id, client_info in clients_list.items(): 409 self.remove_connection(connection_id) 410 411 def close(self): 412 for gate in self._gate: 413 gate.close() 414 415 if gate in self._input_check_sockets: 416 self._input_check_sockets.remove(gate) 417 if gate in self._exception_check_sockets: 418 self._exception_check_sockets.remove(gate) 419 420 if not self._input_check_sockets: 421 self._we_have_connections_for_select = False 422 self._unlink_good_af_unix_sockets() 423 424 def destroy(self): 425 self.close_all_connections() 426 self.remove_all_connections() 427 self.close() 428 429 def add_client(self, expected_client_info: Client): 430 """ 431 Добавляет новый expected client в список. Это может быть как клиент (который сам подключился или подключится в 432 будущем), так и супер-сервер, попытка подключения к которому будет осуществлена тут же - на месте. 433 При этом если произойдет какая-либо ошибка при подключении к супер-серверу - expected client не будет 434 зарегистрирован. Однако client может быть создан. В случае ошибки он будет помечен для закрытия и удаления. 435 Поэтому исключения нужно перехватывать, и после этого проводить как минимум один (как минимум завершающий - 436 перед закрытием и уничтожением сервера) цикл обработки io_iteration(). 437 438 :param expected_client_info: link to Client() 439 :return: expected_client_id 440 """ 441 if (expected_client_info.connection_settings.keyword is None) \ 442 and (ConnectionType.active_accepted == expected_client_info.connection_settings.connection_type): 443 raise Exception('Keyword in Client.connection_settings should not be None for a Client connection!') 444 445 if expected_client_info.connection_settings.keyword in self._keywords_for_expected_clients: 446 raise Exception('Expected Client with keyword "{}" is already registered!'.format( 447 expected_client_info.connection_settings.keyword)) 448 449 expected_client_info.id = self._expected_clients_id_gen() 450 451 if self.unexpected_clients_are_allowed: 452 if expected_client_info.connection_settings.keyword in self._keywords_of_unexpected_clients: 453 # клиент уже подключен 454 unexpected_client_id = self._keywords_of_unexpected_clients[ 455 expected_client_info.connection_settings.keyword] 456 unexpected_client_info = self._unexpected_clients[unexpected_client_id] 457 connection_info = expected_client_info._Client__connection 458 # connection_info = self._connections.get(expected_client_info.connection_id) 459 if (unexpected_client_info.connection_settings.connection_type == 460 expected_client_info.connection_settings.connection_type) and \ 461 (ConnectionType.active_accepted == unexpected_client_info.connection_settings.connection_type): 462 # Произошел запрос на подключение к клиенту, и клиент с таким же ключевым словом уже 463 # подключен (с сервером этого быть не должно, и может произойти только при неверном ручном изменении 464 # внутренних данных объекта класса ASockIOCore). Необходимо переиспользовать уже имеющееся 465 # подключение. 466 # В случае если все же тут оказался соккет подключенный к супер-серверу - он будет автоматически 467 # отключен и соединение будет установлено в новом сокете. 468 expected_client_info.connection_id = unexpected_client_info.connection_id 469 expected_client_info._Client__connection = \ 470 unexpected_client_info._Client__connection 471 self._conns_of_expected_clients[connection_info.conn.result] = expected_client_info.id 472 connection_info.connected_expected_client_id = expected_client_info.id 473 connection_info.connected_expected_client = expected_client_info 474 self._io_iteration_result.newly_connected_expected_clients.add(expected_client_info.id) 475 else: 476 # Произошел запрос на подключение к супер-серверу, но клиент с таким же ключевым словом уже 477 # подключен (или наоборот). Или же просто имеется установленное соединение с супер-сервером. 478 # Необходимо его отключить. 479 self._mark_connection_to_be_closed_immediately(connection_info) 480 self._mark_connection_as_ready_for_deletion(connection_info) 481 self._remove_unexpected_client(unexpected_client_id) 482 else: 483 # клиент еще не подключен 484 expected_client_info.connection_id = None 485 expected_client_info._Client__connection = None 486 487 if ConnectionType.active_connected == expected_client_info.connection_settings.connection_type: 488 self._connect_to_super_server(expected_client_info) 489 490 self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword] = expected_client_info.id 491 self._expected_clients[expected_client_info.id] = expected_client_info 492 493 return expected_client_info.id 494 495 def get_client_id_by_keyword(self, expected_client_keyword): 496 """ 497 :param expected_client_keyword: expected_client_keyword 498 :return: link to Client() 499 """ 500 return self._keywords_for_expected_clients[expected_client_keyword] 501 502 def get_client_info(self, expected_client_id): 503 """ 504 :param expected_client_id: expected_client_id 505 :return: link to Client() 506 """ 507 return self._expected_clients[expected_client_id] 508 509 def get_connection_input_fifo_size_for_client(self, expected_client_id): 510 expected_client_info = self._expected_clients[expected_client_id] 511 connection_info = expected_client_info._Client__connection 512 # connection_info = self._connections.get(expected_client_info.connection_id) 513 if connection_info is None: 514 raise Exception('Expected client was not connected yet!') 515 # if client_info.this_is_raw_connection: 516 # if client_info.input_from_client.size(): 517 # return 1 518 # else: 519 # return 0 520 # else: 521 # return client_info.input_from_client.size() 522 return connection_info.input_from_client.size() 523 524 def get_message_from_client(self, expected_client_id): 525 expected_client_info = self._expected_clients[expected_client_id] 526 connection_info = expected_client_info._Client__connection 527 # connection_info = self._connections.get(expected_client_info.connection_id) 528 if connection_info is None: 529 raise Exception('Expected client was not connected yet!') 530 if not connection_info.input_from_client.size(): 531 raise Exception('There is no readable data in expected client\'s FIFO!') 532 # if client_info.this_is_raw_connection: 533 # self._consolidate_raw_messages_in_input_from_client_fifo(client_info) 534 return connection_info.input_from_client.get() 535 536 # @profile 537 def get_messages_from_client(self, expected_client_id): 538 expected_client_info = self._expected_clients[expected_client_id] 539 connection_info = expected_client_info._Client__connection 540 # connection_info = self._connections.get(expected_client_info.connection_id) 541 if connection_info is None: 542 raise Exception('Expected client was not connected yet!') 543 try: 544 while True: 545 yield connection_info.input_from_client.get() 546 except FIFOIsEmpty: 547 pass 548 # while client_info.input_from_client.size(): 549 # yield client_info.input_from_client.get() 550 551 def get_connection_output_fifo_size_for_client(self, expected_client_id): 552 expected_client_info = self._expected_clients[expected_client_id] 553 connection_info = expected_client_info._Client__connection 554 # connection_info = self._connections.get(expected_client_info.connection_id) 555 if connection_info is None: 556 raise Exception('Expected client was not connected yet!') 557 return connection_info.output_to_client.size() 558 559 def send_message_to_client(self, expected_client_id, data): 560 # data = bytes(data) 561 expected_client_info = self._expected_clients[expected_client_id] 562 connection_info = expected_client_info._Client__connection 563 # connection_info = self._connections.get(expected_client_info.connection_id) 564 if connection_info is None: 565 raise Exception('Expected client was not connected yet!') 566 if connection_info.this_is_raw_connection: 567 self._send_message_through_connection_raw(connection_info, data) 568 else: 569 self._send_message_through_connection(connection_info, data) 570 571 def send_messages_to_client(self, expected_client_id, messages_list): 572 # data = bytes(data) 573 expected_client_info = self._expected_clients[expected_client_id] 574 connection_info = expected_client_info._Client__connection 575 # connection_info = self._connections.get(expected_client_info.connection_id) 576 if connection_info is None: 577 raise Exception('Expected client was not connected yet!') 578 if connection_info.this_is_raw_connection: 579 self._send_messages_through_connection_raw(connection_info, messages_list) 580 else: 581 self._send_messages_through_connection(connection_info, messages_list) 582 583 def check_is_client_is_in_raw_connection_mode(self, expected_client_id): 584 expected_client_info = self._expected_clients[expected_client_id] 585 connection_info = expected_client_info._Client__connection 586 # connection_info = self._connections.get(expected_client_info.connection_id) 587 if connection_info is None: 588 raise Exception('Expected client was not connected yet!') 589 return connection_info.this_is_raw_connection 590 591 def switch_client_raw_connection_mode(self, expected_client_id, is_raw: bool): 592 expected_client_info = self._expected_clients[expected_client_id] 593 connection_info = expected_client_info._Client__connection 594 # connection_info = self._connections.get(expected_client_info.connection_id) 595 if connection_info is None: 596 raise Exception('Expected client was not connected yet!') 597 connection_info.this_is_raw_connection = is_raw 598 599 def set_inline_processor_for_client(self, expected_client_id, 600 class_for_unknown_clients_inline_processing: type): 601 expected_client_info = self._expected_clients[expected_client_id] 602 connection_info = expected_client_info._Client__connection 603 # connection_info = self._connections.get(expected_client_info.connection_id) 604 if connection_info is None: 605 raise Exception('Expected client was not connected yet!') 606 self._set_inline_processor_for_client(connection_info, expected_client_info, 607 class_for_unknown_clients_inline_processing) 608 609 def close_client_connection(self, expected_client_id, raise_if_already_closed=True): 610 """ 611 Connection will be closed immediately (inside this method) 612 :param expected_client_id: 613 :param raise_if_already_closed: 614 :return: 615 """ 616 if __debug__: 617 self._log('CLOSE EXPECTED CLIENT SOCKET:') 618 expected_client_info = self._expected_clients[expected_client_id] 619 connection_info = expected_client_info._Client__connection 620 # connection_info = self._connections.get(expected_client_info.connection_id) 621 if raise_if_already_closed and (connection_info is None): 622 raise Exception('Expected client was not connected yet!') 623 self.close_connection(connection_info.id) 624 625 def mark_client_connection_as_should_be_closed_immediately(self, expected_client_id, 626 raise_if_already_closed=True): 627 """ 628 Connection will be closed immediately (inside main IO loop) 629 :param expected_client_id: 630 :param raise_if_already_closed: 631 :return: 632 """ 633 if __debug__: 634 self._log('MARK EXPECTED CLIENT SOCKET AS SHOULD BE CLOSED IMMEDIATELY:') 635 expected_client_info = self._expected_clients[expected_client_id] 636 connection_info = expected_client_info._Client__connection 637 # connection_info = self._connections.get(expected_client_info.connection_id) 638 if raise_if_already_closed and (connection_info is None): 639 raise Exception('Expected client was not connected yet!') 640 self._mark_connection_to_be_closed_immediately(connection_info) 641 642 def mark_client_connection_as_ready_to_be_closed(self, expected_client_id, raise_if_already_closed=True): 643 """ 644 Connection will be closed when all output will be sent (inside main IO loop). 645 :param expected_client_id: 646 :param raise_if_already_closed: 647 :return: 648 """ 649 if __debug__: 650 self._log('MARK EXPECTED CLIENT SOCKET AS READY TO BE CLOSED:') 651 expected_client_info = self._expected_clients[expected_client_id] 652 connection_info = expected_client_info._Client__connection 653 # connection_info = self._connections.get(expected_client_info.connection_id) 654 if raise_if_already_closed and (connection_info is None): 655 raise Exception('Expected client was not connected yet!') 656 self._mark_connection_as_ready_to_be_closed(connection_info) 657 658 def remove_client(self, expected_client_id): 659 if __debug__: 660 self._log('REMOVE EXPECTED CLIENT: {}'.format(expected_client_id)) 661 expected_client_info = self._expected_clients[expected_client_id] 662 if __debug__: 663 self._log('\tWITH KEYWORD: {}'.format(expected_client_info.connection_settings.keyword)) 664 connection_id = expected_client_info.connection_id 665 if connection_id is None: 666 self._remove_client(expected_client_id) 667 self.remove_connection(connection_id) 668 669 def add_connection(self, conn, address): 670 """ 671 :param conn: socket 672 :param address: address 673 :return: client ID 674 """ 675 if conn is None: 676 raise TypeError('conn should not be None!') 677 678 conn.setblocking(0) 679 if self.use_nodelay_inet and (conn.family in INET_TYPE_CONNECTIONS): 680 conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 681 682 new_client_id = self._connections_id_gen() 683 684 client_info = Connection(new_client_id, (conn, address), self) 685 self._connections[new_client_id] = client_info 686 self._connection_by_conn[conn] = new_client_id 687 self._input_check_sockets.add(conn) 688 self._exception_check_sockets.add(conn) 689 self._we_have_connections_for_select = True 690 691 self._unconfirmed_clients.add(conn) 692 693 return new_client_id 694 695 def close_connection(self, connection_id): 696 if __debug__: 697 self._log('CLOSE CLIENT {}:'.format(connection_id)) 698 client_info = self._connections[connection_id] 699 if not client_info.conn.existence: 700 if __debug__: 701 self._log('CLIENT {} CONN IS NOT SET.'.format(connection_id)) 702 return 703 conn = client_info.conn.result 704 if conn is None: 705 if __debug__: 706 self._log('CLIENT {} CONN IS NONE.'.format(connection_id)) 707 return 708 709 conn.close() 710 client_info.conn.existence = False 711 client_info.output_to_client = copy.copy(client_info.output_to_client) # clear all output data to free some 712 # memory even before destroying 713 714 if connection_id in self._connections_marked_as_ready_to_be_closed: 715 self._connections_marked_as_ready_to_be_closed.remove(connection_id) 716 if conn in self._connections_marked_to_be_closed_immediately: 717 self._connections_marked_to_be_closed_immediately.remove(conn) 718 if conn in self._connection_by_conn: 719 del self._connection_by_conn[conn] 720 if conn in self._input_check_sockets: 721 self._input_check_sockets.remove(conn) 722 if conn in self._output_check_sockets: 723 self._output_check_sockets.remove(conn) 724 if conn in self._exception_check_sockets: 725 self._exception_check_sockets.remove(conn) 726 727 if not self._input_check_sockets: 728 self._we_have_connections_for_select = False 729 730 if __debug__: 731 self._log('CLIENT {} NORMALLY CLOSED.'.format(connection_id)) 732 733 def close_connection_by_conn(self, conn): 734 # Если conn не в списке - вылетет ошибка. Это предотвратит ошибочное закрытие незарегистрированного сокета. 735 # И мы сможем обнаружить наличие соответствующей ошибки в коде. 736 if __debug__: 737 self._log('CLOSE CLIENT BY CONN: {}'.format(repr(conn))) 738 connection_id = self._connection_by_conn[conn] 739 if __debug__: 740 self._log('\t WITH CLIENT ID: {}'.format(connection_id)) 741 self.close_connection(connection_id) 742 743 def remove_connection(self, connection_id): 744 # client should NOT be removed immediately after connection close (close_connection): 745 # code should do it by itself after reading all available input data 746 if __debug__: 747 self._log('REMOVE CLIENT: {}'.format(connection_id)) 748 client_info = self._connections[connection_id] 749 if __debug__: 750 self._log('\tWITH KEYWORD: {}'.format(client_info.keyword)) 751 if client_info.conn.existence: 752 self.close_connection(connection_id) 753 conn = client_info.conn.result 754 if conn is None: 755 return 756 757 if conn in self._conns_of_unexpected_clients: 758 self._remove_unexpected_client(self._conns_of_unexpected_clients[conn]) 759 760 # if conn in self._conns_of_expected_clients: 761 # self._remove_client(self._conns_of_expected_clients[conn]) 762 if client_info.connected_expected_client_id is not None: 763 self._remove_client(client_info.connected_expected_client_id) 764 765 client_info.connected_expected_client_id = None 766 client_info.connected_expected_client = None 767 768 if connection_id in self._connections_marked_as_ready_to_be_deleted: 769 self._connections_marked_as_ready_to_be_deleted.remove(connection_id) 770 771 client_info.conn.existence = False 772 client_info.conn.result = None 773 774 del self._connections[connection_id] 775 if conn in self._connection_by_conn: 776 del self._connection_by_conn[conn] 777 if conn in self._input_check_sockets: 778 self._input_check_sockets.remove(conn) 779 if conn in self._output_check_sockets: 780 self._output_check_sockets.remove(conn) 781 if conn in self._exception_check_sockets: 782 self._exception_check_sockets.remove(conn) 783 784 # client_info.remove() 785 786 def remove_connection_by_conn(self, conn): 787 connection_id = self._connection_by_conn[conn] 788 self.remove_connection(connection_id) 789 790 def _log(self, log_string): 791 self._internal_log.append(log_string) 792 if self.echo_log: 793 print(log_string) 794 795 def _create_unknown_client_from_connection(self, client_info: Connection): 796 keyword = None 797 keyword_is_ok = False 798 while not keyword_is_ok: 799 keyword = self.prefix_for_unknown_client_keywords + self._unknown_clients_keyword_gen().encode() 800 if keyword not in self._keywords_for_expected_clients: 801 keyword_is_ok = True 802 connection_settings = ConnectionSettings(connection_type=ConnectionType.active_accepted, keyword=keyword) 803 expected_client_info = Client(connection_settings) 804 expected_client_info.id = self._expected_clients_id_gen() 805 806 expected_client_info.connection_id = client_info.id 807 expected_client_info._Client__connection = client_info 808 expected_client_info.will_use_raw_client_connection = True 809 expected_client_info.will_use_raw_connection_without_handshake = True 810 expected_client_info.this_is_unknown_client = True 811 if self.class_for_unknown_clients_inline_processing is not None: 812 self._set_inline_processor_for_client(client_info, expected_client_info, 813 self.class_for_unknown_clients_inline_processing) 814 # assert type(self.class_for_unknown_clients_inline_processing) == type, \ 815 # '.class_for_unknown_clients_inline_processing must be a class or None' 816 # expected_client_info.obj_for_inline_processing = self.class_for_unknown_clients_inline_processing( 817 # keyword, client_info.conn.result.family, client_info.conn.result.type, client_info.conn.result.proto, 818 # copy.copy(client_info.addr_info), copy.copy(client_info.host_names) 819 # ) 820 # client_info.has_inline_processor = True 821 # self._inline_processor__init_parameters(client_info, expected_client_info) 822 823 self._conns_of_expected_clients[client_info.conn.result] = expected_client_info.id 824 client_info.connected_expected_client_id = expected_client_info.id 825 client_info.connected_expected_client = expected_client_info 826 self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword] = expected_client_info.id 827 self._expected_clients[expected_client_info.id] = expected_client_info 828 829 self._io_iteration_result.newly_connected_unknown_clients.add(expected_client_info.id) 830 831 return expected_client_info.id 832 833 def _set_inline_processor_for_client(self, connection_info: Connection, 834 expected_client_info: Client, 835 class_for_unknown_clients_inline_processing: type): 836 assert type(class_for_unknown_clients_inline_processing) == type, \ 837 '.class_for_unknown_clients_inline_processing must be a class or None' 838 keyword = expected_client_info.connection_settings.keyword 839 expected_client_info.obj_for_inline_processing = class_for_unknown_clients_inline_processing( 840 expected_client_info.id, keyword, connection_info.conn.result.family, connection_info.conn.result.type, 841 connection_info.conn.result.proto, copy.copy(connection_info.addr_info), 842 copy.copy(connection_info.host_names), self._clients_with_inline_processors_that_need_to_apply_parameters 843 ) 844 connection_info.has_inline_processor = True 845 self._inline_processor__init_parameters(connection_info, expected_client_info) 846 847 def _connect_to_super_server(self, expected_client_info: Client): 848 """ 849 Подключение происходит в блокируещем режиме (неблокирующий режим включается позже - в методе add_connection()). 850 Для реализации неблокирующего режима надо оттестировать текущий код и ввести дополнительную неблокирующую 851 логику через select/poll/epoll: 852 http://man7.org/linux/man-pages/man2/connect.2.html 853 :param expected_client_info: 854 :return: 855 """ 856 connection_settings = expected_client_info.connection_settings 857 conn = None 858 try: 859 conn = socket.socket(connection_settings.socket_family, connection_settings.socket_type, 860 connection_settings.socket_protocol, connection_settings.socket_fileno) 861 # conn.setblocking(0) 862 except (socket.error, OSError) as err: 863 if __debug__: 864 self._log('EXCEPTION: SUPER SERVER: CONNECT TO: CREATE SOCKET: {}, {}'.format( 865 err.errno, err.strerror)) 866 raise 867 868 try: 869 conn.connect(connection_settings.socket_address) 870 except (socket.error, OSError) as err: 871 conn.close() 872 if __debug__: 873 self._log('EXCEPTION: SUPER SERVER: CONNECT TO: CONNECT:"{}", {}, {}'.format( 874 connection_settings.socket_address, err.errno, err.strerror)) 875 raise 876 877 super_server_client_id = self.add_connection(conn, connection_settings.socket_address) 878 879 addr_info = host_names = None 880 try: 881 if self.should_get_client_addr_info_on_connection and (conn.family in INET_TYPE_CONNECTIONS): 882 addr_info = socket.getaddrinfo(connection_settings.socket_address[0], 883 connection_settings.socket_address[1]) 884 host_names = socket.gethostbyaddr(connection_settings.socket_address[0]) 885 except ConnectionError as err: 886 # An established connection was aborted by the software in your host machine 887 if __debug__: 888 self._log('CLOSING {}: Connection reset by peer'.format(connection_settings.socket_address)) 889 if __debug__: 890 self._log('EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format( 891 connection_settings.socket_address, err.errno, err.strerror)) 892 self._mark_connection_to_be_closed_immediately(super_server_client_id) 893 ok = False 894 except (socket.error, OSError) as err: 895 if __debug__: 896 self._log('EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format( 897 connection_settings.socket_address, err.errno, err.strerror)) 898 if err.errno in SET_OF_CONNECTION_ERRORS: 899 # An established connection was aborted by the software in your host machine 900 if __debug__: 901 self._log( 902 'CLOSING {}: Connection reset by peer'.format(connection_settings.socket_address)) 903 if __debug__: 904 self._log( 905 'EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format( 906 connection_settings.socket_address, err.errno, err.strerror)) 907 self._mark_connection_to_be_closed_immediately(super_server_client_id) 908 ok = False 909 else: 910 if 'nt' == os.name: 911 if errno.WSAECONNRESET == err.errno: 912 # An existing connection was forcibly closed by the remote host 913 if __debug__: 914 self._log( 915 'CLOSING {}: Connection reset by peer'.format(connection_settings.socket_address)) 916 if __debug__: 917 self._log( 918 'EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format( 919 connection_settings.socket_address, err.errno, err.strerror)) 920 self._mark_connection_to_be_closed_immediately(super_server_client_id) 921 ok = False 922 else: 923 raise err 924 else: 925 raise err 926 927 super_server_client_info = self._connections[super_server_client_id] 928 super_server_client_info.addr_info = addr_info 929 super_server_client_info.host_names = host_names 930 self._log_new_connection(super_server_client_info, False) 931 932 if expected_client_info.will_use_raw_connection_without_handshake: 933 # Connection is made without handshake 934 super_server_client_info.this_is_raw_connection = True 935 self._unconfirmed_clients.remove(super_server_client_info.conn.result) 936 self._io_iteration_result.newly_connected_expected_clients.add(expected_client_info.id) 937 else: 938 keyword = expected_client_info.connection_settings.keyword 939 if keyword is None: 940 keyword = self._get_own_keyword_appropriate_for_connection(conn) 941 if keyword is None: 942 # Если ключевое слово не предоставлено - клиент будет помечен для закрытия и удаления 943 self._mark_connection_to_be_closed_immediately(super_server_client_info) 944 self._mark_connection_as_ready_for_deletion(super_server_client_info) 945 raise Exception('Own keyword should be provided in connection_settings!') 946 if keyword: 947 # Если ключевое слово предоставлено (даже если оно путое типа b'' - оно будет отправлено супер-серверу). 948 self._send_message_through_connection(super_server_client_info, keyword) 949 950 expected_client_info.connection_id = super_server_client_id 951 expected_client_info._Client__connection = super_server_client_info 952 self._conns_of_expected_clients[conn] = expected_client_info.id 953 super_server_client_info.connected_expected_client_id = expected_client_info.id 954 super_server_client_info.connected_expected_client = expected_client_info 955 956 def _get_own_keyword_appropriate_for_connection(self, conn): 957 keyword = None 958 random_keyword = None 959 for gate_connection_settings in self.gates_connections_settings: 960 random_keyword = gate_connection_settings.keyword 961 if (conn.family == gate_connection_settings.socket_family) and ( 962 conn.type == gate_connection_settings.socket_type) and \ 963 (conn.proto == gate_connection_settings.socket_protocol): 964 keyword = gate_connection_settings.keyword 965 if keyword is None: 966 keyword = random_keyword 967 return keyword 968 969 def _pack_message(self, data): 970 return len(data).to_bytes(self.message_size_len, 'little') + data 971 972 def _remove_client(self, expected_client_id): 973 expected_client_info = self._expected_clients[expected_client_id] 974 del self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword] 975 del self._expected_clients[expected_client_id] 976 connection_info = expected_client_info._Client__connection 977 # connection_info = self._connections.get(expected_client_info.connection_id) 978 if connection_info is not None: 979 # you can remove expected client before it will be connected to the server 980 del self._conns_of_expected_clients[connection_info.conn.result] 981 expected_client_info.connection_id = None 982 expected_client_info._Client__connection = None 983 984 def _add_unexpected_client(self, connection_id, keyword): 985 connection_settings = ConnectionSettings(connection_type=ConnectionType.active_accepted, keyword=keyword) 986 unexpected_client_info = Client(connection_settings, self._unexpected_clients_id_gen(), 987 connection_id) 988 989 self._unexpected_clients[unexpected_client_info.id] = unexpected_client_info 990 self._keywords_of_unexpected_clients[keyword] = unexpected_client_info.id 991 # connection_info = self._connections[unexpected_client_info.connection_id] 992 connection_info = unexpected_client_info._Client__connection 993 # connection_info = self._connections.get(unexpected_client_info.connection_id) 994 self._conns_of_unexpected_clients[connection_info.conn.result] = unexpected_client_info.id 995 996 return unexpected_client_info.id 997 998 def _remove_unexpected_client(self, unexpected_client_id): 999 unexpected_client_info = self._unexpected_clients[unexpected_client_id] 1000 # connection_info = self._connections[unexpected_client_info.connection_id] # unexpected client appears only after 1001 # connection 1002 connection_info = unexpected_client_info._Client__connection # unexpected client appears only after 1003 # connection 1004 # connection_info = self._connections.get(unexpected_client_info.connection_id) # unexpected client appears 1005 # # only after connection 1006 del self._keywords_of_unexpected_clients[unexpected_client_info.connection_settings.keyword] 1007 del self._unexpected_clients[unexpected_client_id] 1008 del self._conns_of_unexpected_clients[connection_info.conn.result] 1009 unexpected_client_info.connection_id = None 1010 unexpected_client_info._Client__connection = None 1011 1012 # @profile 1013 def _accept_new_connection(self, readable_socket: socket.socket): 1014 # One of a "readable" self._gate sockets is ready to accept a connection 1015 ok = True 1016 while ok: 1017 connection_id = None 1018 client_info = None 1019 1020 connection = None 1021 client_address = None 1022 try: 1023 connection, client_address = readable_socket.accept() 1024 connection_id = self.add_connection(connection, client_address) 1025 client_info = self._connections[connection_id] 1026 except BlockingIOError as err: 1027 ok = False 1028 except InterruptedError as err: 1029 pass 1030 except (socket.error, OSError) as err: 1031 if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno): 1032 ok = False 1033 elif errno.EINTR == err.errno: 1034 pass 1035 elif errno.ECONNABORTED == err.errno: 1036 ok = False 1037 elif errno.EMFILE == err.errno: 1038 if __debug__: 1039 self._log( 1040 'The per-process limit on the number of open file descriptors had been reached.') 1041 ok = False 1042 elif errno.ENFILE == err.errno: 1043 if __debug__: 1044 self._log( 1045 'The system-wide limit on the total number of open files had been reached.') 1046 ok = False 1047 elif (errno.ENOBUFS == err.errno) or (errno.ENOMEM == err.errno): 1048 if __debug__: 1049 self._log( 1050 'Not enough free memory. Allocation is limited by the socket buffer limits.') 1051 ok = False 1052 elif errno.EPROTO == err.errno: 1053 if __debug__: 1054 self._log('Protocol error.') 1055 ok = False 1056 elif errno.EPERM == err.errno: 1057 if __debug__: 1058 self._log('Firewall rules forbid connection.') 1059 ok = False 1060 else: 1061 raise err 1062 1063 if (connection is not None) and (client_info is not None): 1064 addr_info = host_names = None 1065 try: 1066 if self.should_get_client_addr_info_on_connection and \ 1067 (connection.family in INET_TYPE_CONNECTIONS): 1068 addr_info = socket.getaddrinfo(client_address[0], client_address[1]) 1069 host_names = socket.gethostbyaddr(client_address[0]) 1070 except ConnectionError as err: 1071 # An established connection was aborted by the software in your host machine 1072 if __debug__: 1073 self._log('CLOSING {}: Connection reset by peer'.format(client_address)) 1074 if __debug__: 1075 self._log('EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1076 client_address, err.errno, err.strerror)) 1077 self._mark_connection_to_be_closed_immediately(client_info) 1078 ok = False 1079 except (socket.error, OSError) as err: 1080 if __debug__: 1081 self._log('EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1082 client_address, err.errno, err.strerror)) 1083 if err.errno in SET_OF_CONNECTION_ERRORS: 1084 # An established connection was aborted by the software in your host machine 1085 if __debug__: 1086 self._log('CLOSING {}: Connection reset by peer'.format(client_address)) 1087 if __debug__: 1088 self._log('EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1089 client_address, err.errno, err.strerror)) 1090 self._mark_connection_to_be_closed_immediately(client_info) 1091 ok = False 1092 else: 1093 if 'nt' == os.name: 1094 if errno.WSAECONNRESET == err.errno: 1095 # An existing connection was forcibly closed by the remote host 1096 if __debug__: 1097 self._log( 1098 'CLOSING {}: Connection reset by peer'.format(client_address)) 1099 if __debug__: 1100 self._log( 1101 'EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1102 client_address, err.errno, err.strerror)) 1103 self._mark_connection_to_be_closed_immediately(client_info) 1104 ok = False 1105 else: 1106 raise err 1107 else: 1108 raise err 1109 1110 client_info.addr_info = addr_info 1111 client_info.host_names = host_names 1112 self._log_new_connection(client_info, True) 1113 1114 if self.need_to_auto_check_incoming_raw_connection \ 1115 and self.raw_checker_for_new_incoming_connections(self, client_info): 1116 # Is should be RAW: 1117 if self.unknown_clients_are_allowed: 1118 # Unknown clients are allowed - create Unknown Expected Client for this connection 1119 client_info.this_is_raw_connection = True 1120 self._unconfirmed_clients.remove(client_info.conn.result) 1121 self._create_unknown_client_from_connection(client_info) 1122 else: 1123 # Unknown clients are Non allowed - close connection. 1124 if __debug__: 1125 self._log( 1126 'UNKNOWN CLIENT {} WILL BE CLOSED: UNKNOWN CLIENTS ARE NOT ALLOWED'.format( 1127 client_info.addr.result 1128 )) 1129 self._mark_connection_to_be_closed_immediately(client_info) 1130 1131 # @profile 1132 def _read_data_from_already_connected_socket__inner__memory_optimized(self, connection_info: Connection, 1133 possible_messages_in_client_input_fifo=False): 1134 ok = True 1135 1136 data = connection_info.conn.result.recv(connection_info.recv_buff_size) 1137 data_len = len(data) 1138 # self.recv_buff_sizes.append(connection_info.recv_buff_size) 1139 # self.recv_sizes.append(data_len) 1140 connection_info.calc_new_recv_buff_size(data_len) 1141 # if __debug__: self._log('received "{}" from {}'.format( 1142 # data, connection_info.addr.result)) 1143 if data: 1144 # if data_len > 4096: 1145 # data = memoryview(data) 1146 data = memoryview(data) 1147 if connection_info.this_is_raw_connection: 1148 connection_info.input_from_client.put(data) 1149 possible_messages_in_client_input_fifo = True 1150 else: 1151 connection_info.raw_input_from_client.add_piece_of_data(data) 1152 # possible_messages_in_client_input_fifo = True 1153 possible_messages_in_client_input_fifo = self._read_messages_from_raw_input_into_fifo( 1154 connection_info) 1155 else: 1156 # Interpret empty result as closed connection 1157 if __debug__: 1158 self._log( 1159 'CLOSING {} after reading no data:'.format(connection_info.addr.result)) 1160 self._mark_connection_to_be_closed_immediately(connection_info) 1161 ok = False 1162 1163 result = (ok, possible_messages_in_client_input_fifo) 1164 return result 1165 1166 # @profile 1167 def _read_data_from_already_connected_socket__inner__speed_optimized(self, connection_info: Connection, 1168 possible_messages_in_client_input_fifo=False): 1169 # print(current_line()) 1170 ok = True 1171 1172 if connection_info.current_input_memoryview: 1173 # print(current_line()) 1174 nbytes = connection_info.conn.result.recv_into(connection_info.current_input_memoryview) 1175 # print('len(connection_info.current_input_memoryview):', len(connection_info.current_input_memoryview)) 1176 # print('nbytes', nbytes) 1177 # self.recv_buff_sizes.append(len(connection_info.current_input_memoryview)) 1178 # self.recv_sizes.append(nbytes) 1179 if nbytes > 0: 1180 # print(current_line()) 1181 # data = bytes(connection_info.current_input_memoryview[:nbytes]) 1182 # res_data = bytes(connection_info.current_input_memoryview[nbytes:]) 1183 data = connection_info.current_input_memoryview[:nbytes] 1184 connection_info.current_input_memoryview = connection_info.current_input_memoryview[nbytes:] 1185 1186 if connection_info.this_is_raw_connection: 1187 # print(current_line()) 1188 connection_info.input_from_client.put(data) 1189 possible_messages_in_client_input_fifo = True 1190 else: 1191 # print(current_line()) 1192 connection_info.raw_input_from_client.add_piece_of_data(data) 1193 # possible_messages_in_client_input_fifo = True 1194 possible_messages_in_client_input_fifo = self._read_messages_from_raw_input_into_fifo( 1195 connection_info) 1196 else: 1197 # print(current_line()) 1198 # Interpret empty result as closed connection 1199 if __debug__: 1200 self._log( 1201 'CLOSING {} after reading no data:'.format(connection_info.addr.result)) 1202 self._mark_connection_to_be_closed_immediately(connection_info) 1203 ok = False 1204 else: 1205 connection_info.current_input_memoryview = memoryview(bytearray(self.socket_read_fixed_buffer_size.result)) 1206 1207 result = (ok, possible_messages_in_client_input_fifo) 1208 return result 1209 1210 # @profile 1211 def _read_data_from_already_connected_socket__inner__speed_optimized_for_messages( 1212 self, connection_info: Connection, possible_messages_in_client_input_fifo=False): 1213 ok = True 1214 1215 if connection_info.current_input_memoryview: 1216 nbytes = connection_info.conn.result.recv_into(connection_info.current_input_memoryview) 1217 if nbytes > 0: 1218 # data = connection_info.current_input_memoryview[:nbytes] 1219 connection_info.current_input_memoryview = connection_info.current_input_memoryview[nbytes:] 1220 connection_info.current_input_memoryview_nbytes += nbytes 1221 connection_info.current_input_memoryview_diff += nbytes 1222 1223 if connection_info.this_is_raw_connection: 1224 data = connection_info.current_input_memoryview[:nbytes] 1225 connection_info.input_from_client.put(data) 1226 possible_messages_in_client_input_fifo = True 1227 else: 1228 # connection_info.raw_input_from_client.add_piece_of_data(data) 1229 # possible_messages_in_client_input_fifo = self._read_messages_from_raw_input_into_fifo( 1230 # connection_info) 1231 possible_messages_in_client_input_fifo = self._read_messages_from_input_memoryview_into_raw_input( 1232 connection_info) 1233 # if not connection_info.current_input_memoryview: 1234 # connection_info.raw_input_from_client.add_piece_of_data( 1235 # memoryview(connection_info.current_input_memoryview.obj)[ 1236 # connection_info.current_input_memoryview_offset:]) 1237 # # Закоментировано, т.к. будет вызвано при следующем чтении (см. код ниже): 1238 # # connection_info.current_input_memoryview = \ 1239 # # memoryview(bytearray(self.socket_read_fixed_buffer_size.result)) 1240 # # connection_info.current_input_memoryview_offset = 0 1241 # # connection_info.current_input_memoryview_nbytes = 0 1242 # # connection_info.current_input_memoryview_diff = 0 1243 else: 1244 if __debug__: 1245 self._log( 1246 'CLOSING {} after reading no data:'.format(connection_info.addr.result)) 1247 self._mark_connection_to_be_closed_immediately(connection_info) 1248 ok = False 1249 else: 1250 connection_info.current_input_memoryview = memoryview(bytearray(self.socket_read_fixed_buffer_size.result)) 1251 connection_info.current_input_memoryview_offset = 0 1252 connection_info.current_input_memoryview_nbytes = 0 1253 connection_info.current_input_memoryview_diff = 0 1254 1255 result = (ok, possible_messages_in_client_input_fifo) 1256 return result 1257 1258 # @profile 1259 def _read_data_from_already_connected_socket__shell(self, readable_socket: socket.socket): 1260 # print(current_line()) 1261 possible_messages_in_client_input_fifo = False 1262 1263 curr_client_id = self._connection_by_conn[readable_socket] 1264 connection_info = self._connections[curr_client_id] 1265 1266 ok = True 1267 while ok: 1268 # print(current_line()) 1269 try: 1270 if self.use_speed_optimized_socket_read: 1271 ok, possible_messages_in_client_input_fifo = \ 1272 self._read_data_from_already_connected_socket__inner__speed_optimized( 1273 connection_info, possible_messages_in_client_input_fifo) 1274 else: 1275 ok, possible_messages_in_client_input_fifo = \ 1276 self._read_data_from_already_connected_socket__inner__memory_optimized( 1277 connection_info, possible_messages_in_client_input_fifo) 1278 break # makes IO faster on 10-30% on all modes (message/raw, data/http, 1279 # static_read_buffer/non_static_read_buffer). 1280 # Ускорение происходит за счет того, что при таком разбиении ввод и вывод начинают происходить (на 1281 # уровне ОС) одновременно. Т.е. мы взяли из входного буфера порцию данных и пустили в обработку. В это 1282 # время ввод на уровне ОС продолжается: ОС заполняет опустевшие буферы. После обработки мы пускаем 1283 # данные на отправку и забираем очередную порцию данных из входного буфера. В это время происходит 1284 # отправка данных из буферов на уровне ОС. И т.д. 1285 except BlockingIOError as err: 1286 # print(current_line()) 1287 ok = False 1288 except InterruptedError as err: 1289 # print(current_line()) 1290 pass 1291 except ConnectionError as err: 1292 # print(current_line()) 1293 # An established connection was aborted by the software in your host machine 1294 if __debug__: 1295 self._log('CLOSING {}: Connection reset by peer'.format(connection_info.addr.result)) 1296 if __debug__: 1297 self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1298 connection_info.addr.result, err.errno, err.strerror)) 1299 self._mark_connection_to_be_closed_immediately(connection_info) 1300 ok = False 1301 except (socket.error, OSError) as err: 1302 # print(current_line()) 1303 if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno): 1304 # print(current_line()) 1305 ok = False 1306 elif errno.EINTR == err.errno: 1307 # print(current_line()) 1308 pass 1309 elif err.errno in SET_OF_CONNECTION_ERRORS: 1310 # print(current_line()) 1311 # An established connection was aborted by the software in your host machine 1312 if __debug__: 1313 self._log('CLOSING {}: Connection reset by peer'.format(connection_info.addr.result)) 1314 if __debug__: 1315 self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1316 connection_info.addr.result, err.errno, err.strerror)) 1317 self._mark_connection_to_be_closed_immediately(connection_info) 1318 ok = False 1319 else: 1320 # print(current_line()) 1321 if 'nt' == os.name: 1322 # print(current_line()) 1323 if errno.WSAECONNRESET == err.errno: 1324 # print(current_line()) 1325 # An existing connection was forcibly closed by the remote host 1326 if __debug__: 1327 self._log( 1328 'CLOSING {}: Connection reset by peer'.format(connection_info.addr.result)) 1329 if __debug__: 1330 self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1331 connection_info.addr.result, err.errno, err.strerror)) 1332 self._mark_connection_to_be_closed_immediately(connection_info) 1333 ok = False 1334 else: 1335 # print(current_line()) 1336 raise err 1337 else: 1338 # print(current_line()) 1339 raise err 1340 1341 read_is_forbidden = False 1342 if (self.global_in__data_full_size.result - self.global_in__deletable_data_full_size.result) \ 1343 > self.global_in__data_size_limit.result: 1344 read_is_forbidden = True 1345 ok = False 1346 1347 read_is_forbidden_test = self.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger( 1348 read_is_forbidden) 1349 if read_is_forbidden_test is not None: 1350 if read_is_forbidden_test: 1351 print('Read is suppressed until data will be processed.') 1352 else: 1353 print('Read is allowed: data is processed.') 1354 1355 return possible_messages_in_client_input_fifo 1356 1357 # @profile 1358 def _read_data_from_socket(self, readable_socket: socket.socket): 1359 possible_messages_in_client_input_fifo = False 1360 if readable_socket in self._gate: 1361 accept_is_forbidden = True 1362 if (self.global_in__data_full_size.result + self.global_out__data_full_size.result) \ 1363 <= self.global__data_size_limit.result: 1364 accept_is_forbidden = False 1365 self._accept_new_connection(readable_socket) 1366 1367 if __debug__: 1368 accept_is_forbidden_test = \ 1369 self.show_inform_about_accept_stop_because_of_all_buffers_size_limit.test_trigger( 1370 accept_is_forbidden) 1371 if accept_is_forbidden_test is not None: 1372 if accept_is_forbidden_test: 1373 print('Accept is suppressed until data will be processed and/or out.') 1374 else: 1375 print('Accept is allowed: data is processed and/or out.') 1376 else: 1377 possible_messages_in_client_input_fifo = self._read_data_from_already_connected_socket__shell( 1378 readable_socket) 1379 1380 return possible_messages_in_client_input_fifo 1381 1382 # def _log_new_connection(self, connection, client_address, is_incoming_connection): 1383 def _log_new_connection(self, client_info: Connection, is_incoming_connection): 1384 connection = client_info.conn.result 1385 client_address = client_info.addr.result 1386 addr_info = client_info.addr_info 1387 host_names = client_info.host_names 1388 1389 connection_type_string = 'OUTGOING' 1390 from_to = 'to' 1391 if is_incoming_connection: 1392 connection_type_string = 'INCOMING' 1393 from_to = 'from' 1394 1395 if __debug__: 1396 self._log('New {} connection {} {}'.format(connection_type_string, from_to, client_address)) 1397 if connection.family in {socket.AF_INET, socket.AF_INET6}: 1398 if __debug__: 1399 self._log('\taddr_info: {}'.format(addr_info)) 1400 if __debug__: 1401 self._log('\thost_names: {}'.format(host_names)) 1402 1403 # # @profile 1404 # def _write_data_to_socket(self, writable_socket: socket.socket): 1405 # # Data read from already connected client 1406 # # curr_client_id = self._connection_by_conn[writable_socket] 1407 # # curr_client_info = self._connections[curr_client_id] 1408 # curr_client_info = self._connections[self._connection_by_conn[writable_socket]] 1409 # if curr_client_info.should_be_closed: 1410 # curr_client_info.current_output_memoryview = None 1411 # curr_client_info.output_to_client = copy.copy(curr_client_info.output_to_client) 1412 # return 1413 # 1414 # ok = True 1415 # first_pass = True 1416 # can_call__inline_processor__on__output_buffers_are_empty = True 1417 # while ok: 1418 # try: 1419 # if curr_client_info.current_output_memoryview: 1420 # nsent = writable_socket.send(curr_client_info.current_output_memoryview) 1421 # # if __debug__: self._log('sending "{}" to {}'.format( 1422 # # curr_client_info.current_output_memoryview[:nsent], curr_client_info.addr.result)) 1423 # curr_client_info.current_output_memoryview = curr_client_info.current_output_memoryview[nsent:] 1424 # else: 1425 # curr_client_info.current_output_memoryview = None 1426 # # self._move_message_from_fifo_to_memoryview(curr_client_info) 1427 # # self._consolidate_and_move_messages_from_fifo_to_memoryview(curr_client_info) 1428 # output_fifo_size = curr_client_info.output_to_client.size() 1429 # if output_fifo_size > 1: 1430 # result_data, result_size, result_qnt = \ 1431 # curr_client_info.output_to_client.get_at_least_size(524288) 1432 # if result_qnt > 1: 1433 # curr_client_info.current_output_memoryview = memoryview(b''.join(result_data)) 1434 # else: 1435 # curr_client_info.current_output_memoryview = memoryview(result_data.popleft()) 1436 # elif output_fifo_size == 1: 1437 # curr_client_info.current_output_memoryview = memoryview(curr_client_info.output_to_client.get()) 1438 # 1439 # if curr_client_info.current_output_memoryview is None: 1440 # if not curr_client_info.ready_to_be_closed: 1441 # # Если соединение помечено как "Готово к закрытию" - то нам надо дождаться момента когда 1442 # # данные будут отправлены, и только в этот момент закрывать соединение. Поэтому надо 1443 # # сохранить сокет в списке проверяемых для отправки. 1444 # self._output_check_sockets.remove(writable_socket) 1445 # if first_pass and curr_client_info.ready_to_be_closed: 1446 # # Т.е. если данных на отправку небыло даже при первом проходе цикла - т.е. изначально. 1447 # # Это значит что все данные были отправлены, и можно закрывать соединение. 1448 # self._output_check_sockets.remove(writable_socket) 1449 # self._mark_connection_to_be_closed_immediately(curr_client_info) 1450 # # if curr_client_info.ready_to_be_closed: 1451 # # self._mark_connection_to_be_closed_immediately(curr_client_info) 1452 # # if writable_socket in self._conns_of_expected_clients: 1453 # # expected_client_id = self._conns_of_expected_clients[writable_socket] 1454 # # self._io_iteration_result.clients_with_empty_output_fifo.add(expected_client_id) 1455 # ok = False 1456 # if curr_client_info.connected_expected_client_id is not None: 1457 # self._io_iteration_result.clients_with_empty_output_fifo.add( 1458 # curr_client_info.connected_expected_client_id) 1459 # if curr_client_info.has_inline_processor: 1460 # if can_call__inline_processor__on__output_buffers_are_empty: 1461 # if self._inline_processor__on__output_buffers_are_empty( 1462 # curr_client_info, curr_client_info.connected_expected_client): 1463 # ok = True 1464 # can_call__inline_processor__on__output_buffers_are_empty = False 1465 # except BlockingIOError as err: 1466 # ok = False 1467 # except InterruptedError as err: 1468 # pass 1469 # except ConnectionError as err: 1470 # # An established connection was aborted by the software in your host machine 1471 # if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result)) 1472 # if __debug__: self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format( 1473 # curr_client_info.addr.result, err.errno, err.strerror)) 1474 # self._mark_connection_to_be_closed_immediately(curr_client_info) 1475 # ok = False 1476 # except (socket.error, OSError) as err: 1477 # if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno): 1478 # ok = False 1479 # elif errno.EINTR == err.errno: 1480 # pass 1481 # elif err.errno in SET_OF_CONNECTION_ERRORS: 1482 # # Connection reset by peer 1483 # if __debug__: self._log('CLOSING {}: Connection reset by peer ({})'.format(curr_client_info.addr.result, 1484 # err.strerror)) 1485 # if __debug__: self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format( 1486 # curr_client_info.addr.result, err.errno, err.strerror)) 1487 # self._mark_connection_to_be_closed_immediately(curr_client_info) 1488 # ok = False 1489 # else: 1490 # if 'nt' == os.name: 1491 # if errno.WSAECONNRESET == err.errno: 1492 # # An existing connection was forcibly closed by the remote host 1493 # if __debug__: self._log( 1494 # 'CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result)) 1495 # if __debug__: self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format( 1496 # curr_client_info.addr.result, err.errno, err.strerror)) 1497 # self._mark_connection_to_be_closed_immediately(curr_client_info) 1498 # ok = False 1499 # else: 1500 # raise err 1501 # else: 1502 # raise err 1503 # first_pass = False 1504 1505 # @profile 1506 def _write_data_to_socket(self, curr_client_info: Connection): 1507 # CAUTION: code here is optimized for speed - not for readability or beauty. 1508 1509 expected_client_info = curr_client_info.connected_expected_client 1510 writable_socket = curr_client_info.conn.result 1511 if curr_client_info.should_be_closed: 1512 curr_client_info.current_output_memoryview = None 1513 curr_client_info.output_to_client = copy.copy(curr_client_info.output_to_client) 1514 return 1515 1516 ok = True 1517 first_pass = True 1518 can_call__inline_processor__on__output_buffers_are_empty = True 1519 while ok: 1520 try: 1521 if curr_client_info.current_output_memoryview: 1522 nsent = writable_socket.send(curr_client_info.current_output_memoryview) 1523 # if __debug__: self._log('sending "{}" to {}'.format( 1524 # curr_client_info.current_output_memoryview[:nsent], curr_client_info.addr.result)) 1525 curr_client_info.current_output_memoryview = curr_client_info.current_output_memoryview[nsent:] 1526 else: 1527 curr_client_info.current_output_memoryview = None 1528 # self._move_message_from_fifo_to_memoryview(curr_client_info) 1529 # self._consolidate_and_move_messages_from_fifo_to_memoryview(curr_client_info) 1530 output_fifo_size = curr_client_info.output_to_client.size() 1531 if output_fifo_size > 1: 1532 result_data, result_size, result_qnt = \ 1533 curr_client_info.output_to_client.get_at_least_size(524288) 1534 if result_qnt > 1: 1535 curr_client_info.current_output_memoryview = memoryview(b''.join(result_data)) 1536 else: 1537 curr_client_info.current_output_memoryview = memoryview(result_data.popleft()) 1538 elif output_fifo_size == 1: 1539 curr_client_info.current_output_memoryview = memoryview(curr_client_info.output_to_client.get()) 1540 1541 if curr_client_info.current_output_memoryview is None: 1542 if not curr_client_info.ready_to_be_closed: 1543 # Если соединение помечено как "Готово к закрытию" - то нам надо дождаться момента когда 1544 # данные будут отправлены, и только в этот момент закрывать соединение. Поэтому надо 1545 # сохранить сокет в списке проверяемых для отправки. 1546 self._output_check_sockets.remove(writable_socket) 1547 if first_pass and curr_client_info.ready_to_be_closed: 1548 # Т.е. если данных на отправку небыло даже при первом проходе цикла - т.е. изначально. 1549 # Это значит что все данные были отправлены, и можно закрывать соединение. 1550 self._output_check_sockets.remove(writable_socket) 1551 self._mark_connection_to_be_closed_immediately(curr_client_info) 1552 # if curr_client_info.ready_to_be_closed: 1553 # self._mark_connection_to_be_closed_immediately(curr_client_info) 1554 # if writable_socket in self._conns_of_expected_clients: 1555 # expected_client_id = self._conns_of_expected_clients[writable_socket] 1556 # self._io_iteration_result.clients_with_empty_output_fifo.add(expected_client_id) 1557 ok = False 1558 if expected_client_info: 1559 self._io_iteration_result.clients_with_empty_output_fifo.add( 1560 curr_client_info.connected_expected_client_id) 1561 if curr_client_info.has_inline_processor: 1562 if can_call__inline_processor__on__output_buffers_are_empty: 1563 if self._inline_processor__on__output_buffers_are_empty(curr_client_info, 1564 expected_client_info): 1565 ok = True 1566 can_call__inline_processor__on__output_buffers_are_empty = False 1567 except BlockingIOError as err: 1568 ok = False 1569 except InterruptedError as err: 1570 pass 1571 except ConnectionError as err: 1572 # An established connection was aborted by the software in your host machine 1573 if __debug__: 1574 self._log('CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result)) 1575 if __debug__: 1576 self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format( 1577 curr_client_info.addr.result, err.errno, err.strerror)) 1578 self._mark_connection_to_be_closed_immediately(curr_client_info) 1579 ok = False 1580 except (socket.error, OSError) as err: 1581 if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno): 1582 ok = False 1583 elif errno.EINTR == err.errno: 1584 pass 1585 elif err.errno in SET_OF_CONNECTION_ERRORS: 1586 # Connection reset by peer 1587 if __debug__: 1588 self._log( 1589 'CLOSING {}: Connection reset by peer ({})'.format(curr_client_info.addr.result, 1590 err.strerror)) 1591 if __debug__: 1592 self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format( 1593 curr_client_info.addr.result, err.errno, err.strerror)) 1594 self._mark_connection_to_be_closed_immediately(curr_client_info) 1595 ok = False 1596 else: 1597 if 'nt' == os.name: 1598 if errno.WSAECONNRESET == err.errno: 1599 # An existing connection was forcibly closed by the remote host 1600 if __debug__: 1601 self._log( 1602 'CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result)) 1603 if __debug__: 1604 self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format( 1605 curr_client_info.addr.result, err.errno, err.strerror)) 1606 self._mark_connection_to_be_closed_immediately(curr_client_info) 1607 ok = False 1608 else: 1609 raise err 1610 else: 1611 raise err 1612 first_pass = False 1613 1614 def _mark_connection_to_be_closed_immediately(self, client_info: Connection): 1615 client_info.should_be_closed = True 1616 self._connections_marked_to_be_closed_immediately.add(client_info.conn.result) 1617 # if client_info.conn.result in self._conns_of_expected_clients: 1618 # expected_client_id = self._conns_of_expected_clients[client_info.conn.result] 1619 # self._io_iteration_result.clients_with_disconnected_connection.add(expected_client_id) 1620 if client_info.connected_expected_client_id is not None: 1621 self._io_iteration_result.clients_with_disconnected_connection.add( 1622 client_info.connected_expected_client_id) 1623 1624 def _mark_connection_as_ready_to_be_closed(self, client_info: Connection): 1625 client_info.ready_to_be_closed = True 1626 1627 def _mark_connection_as_ready_for_deletion(self, client_info: Connection): 1628 client_info.ready_for_deletion = True 1629 self._connections_marked_as_ready_to_be_deleted.add(client_info.conn.result) 1630 1631 def _handle_connection_error(self, writable_socket: socket.socket): 1632 # Data read from already connected client 1633 curr_client_id = self._connection_by_conn[writable_socket] 1634 curr_client_info = self._connections[curr_client_id] 1635 if curr_client_info.should_be_closed: 1636 return 1637 if __debug__: 1638 self._log('handling exceptional condition for {}'.format(curr_client_info.addr.result)) 1639 self._mark_connection_to_be_closed_immediately(curr_client_info) 1640 1641 # # @profile 1642 # def _read_messages_from_raw_input_into_fifo__cheat(self, readable_socket: socket.socket): 1643 # result = False 1644 # curr_client_id = self._connection_by_conn[readable_socket] 1645 # curr_client_info = self._connections[curr_client_id] 1646 # 1647 # for index in range(curr_client_info.raw_input_from_client.qnt()): 1648 # curr_client_info.input_from_client.put(curr_client_info.raw_input_from_client._data.popleft()) 1649 # result = True 1650 # # for piece in curr_client_info.raw_input_from_client._data: 1651 # # curr_client_info.input_from_client.put(piece) 1652 # # result = True 1653 # 1654 # if result: 1655 # # curr_client_info.raw_input_from_client = DynamicListOfPieces( 1656 # # curr_client_info.raw_input_from_client._joiner, curr_client_info.raw_input_from_client._offset_limit) 1657 # curr_client_info.raw_input_from_client = copy.copy(curr_client_info.raw_input_from_client) 1658 # 1659 # return result 1660 1661 # # @profile 1662 # def _read_messages_from_raw_input_into_fifo(self, readable_socket: socket.socket): 1663 # result = False 1664 # curr_client_id = self._connection_by_conn[readable_socket] 1665 # curr_client_info = self._connections[curr_client_id] 1666 # 1667 # if curr_client_info.this_is_raw_connection: 1668 # if curr_client_info.input_from_client.size(): 1669 # result = True 1670 # return result 1671 # 1672 # while True: 1673 # if curr_client_info.current_message_length is None: 1674 # current_message_length = curr_client_info.raw_input_from_client.read_data(self.message_size_len) 1675 # if current_message_length is not None: 1676 # curr_client_info.current_message_length = int.from_bytes(current_message_length, 'little') 1677 # if curr_client_info.current_message_length is not None: 1678 # current_message = curr_client_info.raw_input_from_client.get_data( 1679 # self.message_size_len + curr_client_info.current_message_length) 1680 # if current_message is not None: 1681 # curr_client_info.current_message_length = None 1682 # curr_client_info.input_from_client.put(current_message[self.message_size_len:]) 1683 # result = True 1684 # else: 1685 # break 1686 # else: 1687 # break 1688 # return result 1689 1690 # @profile 1691 # def _read_messages_from_raw_input_into_fifo(self, curr_client_info: Connection): 1692 # result = False 1693 # if curr_client_info.this_is_raw_connection: 1694 # if curr_client_info.input_from_client.size(): 1695 # # print('curr_client_info.input_from_client.size(): ', curr_client_info.input_from_client.size()) 1696 # result = True 1697 # return result 1698 # 1699 # while True: 1700 # if curr_client_info.current_message_length is None: 1701 # current_message_length = curr_client_info.raw_input_from_client.read_data(self.message_size_len) 1702 # if current_message_length is not None: 1703 # curr_client_info.current_message_length = int.from_bytes(current_message_length, 'little') 1704 # if curr_client_info.current_message_length is not None: 1705 # current_message = curr_client_info.raw_input_from_client.get_data( 1706 # self.message_size_len + curr_client_info.current_message_length) 1707 # if current_message is not None: 1708 # curr_client_info.current_message_length = None 1709 # curr_client_info.input_from_client.put(current_message[self.message_size_len:]) 1710 # result = True 1711 # else: 1712 # break 1713 # else: 1714 # break 1715 # return result 1716 1717 # @profile 1718 def _read_messages_from_raw_input_into_fifo(self, connection_info: Connection) -> bool: 1719 result = False 1720 # if connection_info.this_is_raw_connection: 1721 # if connection_info.input_from_client.size(): 1722 # # print('connection_info.input_from_client.size(): ', connection_info.input_from_client.size()) 1723 # result = True 1724 # return result 1725 1726 try: 1727 while True: 1728 if connection_info.current_message_length is None: 1729 current_message_length = connection_info.raw_input_from_client.get_data(self.message_size_len) 1730 connection_info.current_message_length = int.from_bytes(current_message_length, 'little') 1731 1732 current_message = connection_info.raw_input_from_client.get_data( 1733 connection_info.current_message_length) 1734 if current_message is None: 1735 break 1736 else: 1737 connection_info.input_from_client.put(current_message) 1738 connection_info.current_message_length = None 1739 result = True 1740 except TypeError: 1741 # TODO: what it means? 1742 pass 1743 1744 return result 1745 1746 def _read_messages_from_input_memoryview_into_raw_input(self, connection_info: Connection) -> bool: 1747 result = False 1748 1749 while True: 1750 if connection_info.current_message_length is None: 1751 if connection_info.current_input_memoryview_diff >= self.message_size_len: 1752 current_message_length = memoryview(connection_info.current_input_memoryview.obj)[ 1753 connection_info.current_input_memoryview_offset:self.message_size_len] 1754 connection_info.current_message_length = int.from_bytes(current_message_length, 'little') 1755 connection_info.current_input_memoryview_offset += self.message_size_len 1756 connection_info.current_input_memoryview_diff -= self.message_size_len 1757 connection_info.current_input_memoryview_message_nbytes = 0 1758 # elif (connection_info.current_input_memoryview_diff + connection_info.raw_input_from_client.size()) \ 1759 # >= self.message_size_len: 1760 # current_message_length_part_0 = \ 1761 # memoryview(connection_info.current_input_memoryview.obj)[ 1762 # connection_info.current_input_memoryview_offset:self.message_size_len] 1763 # current_message_length_part_1_len = \ 1764 # self.message_size_len - connection_info.current_input_memoryview_diff 1765 # current_message_length_part_1 = connection_info.raw_input_from_client.get_data() 1766 # pass 1767 else: 1768 break 1769 1770 current_message = connection_info.raw_input_from_client.get_data( 1771 connection_info.current_message_length) 1772 if current_message is None: 1773 break 1774 else: 1775 connection_info.input_from_client.put(current_message) 1776 connection_info.current_message_length = None 1777 result = True 1778 1779 if (not connection_info.current_input_memoryview) and connection_info.current_input_memoryview_diff: 1780 connection_info.raw_input_from_client.add_piece_of_data( 1781 memoryview(connection_info.current_input_memoryview.obj)[ 1782 connection_info.current_input_memoryview_offset:]) 1783 1784 return result 1785 1786 def _send_message_through_connection(self, client_info: Connection, data): 1787 if client_info.conn.existence: 1788 # if len(data) <= self.largest_small_output_data_block_size: 1789 # client_info.output_to_client.put(len(data).to_bytes(self.message_size_len, 'little') + data) 1790 # else: 1791 # client_info.output_to_client.put(len(data).to_bytes(self.message_size_len, 'little')) 1792 # client_info.output_to_client.put(data) 1793 client_info.output_to_client.put(len(data).to_bytes(self.message_size_len, 'little')) 1794 client_info.output_to_client.put(data) 1795 1796 self._output_check_sockets.add(client_info.conn.result) 1797 else: 1798 if __debug__: 1799 self._log('ERROR: SEND MESSAGE TO CLIENT {}: "{}"'.format(client_info.addr.result, data)) 1800 raise Exception('EXCEPTION: SEND MESSAGE TO CLIENT: Client is disconnected! You can not send data to him!') 1801 1802 def _generate_list_of_messages_with_their_length(self, messages_list): 1803 for message in messages_list: 1804 yield len(message).to_bytes(self.message_size_len, 'little') 1805 yield message 1806 1807 def _send_messages_through_connection(self, client_info: Connection, messages_list): 1808 if client_info.conn.existence: 1809 # for message in messages_list: 1810 # client_info.output_to_client.put(len(message).to_bytes(self.message_size_len, 'little')) 1811 # client_info.output_to_client.put(message) 1812 client_info.output_to_client.extend(self._generate_list_of_messages_with_their_length(messages_list)) 1813 1814 self._output_check_sockets.add(client_info.conn.result) 1815 else: 1816 if __debug__: 1817 self._log( 1818 'ERROR: SEND MESSAGES TO CLIENT {}: "{}"'.format(client_info.addr.result, messages_list)) 1819 raise Exception('EXCEPTION: SEND MESSAGES TO CLIENT: Client is disconnected! You can not send data to him!') 1820 1821 def _send_message_through_connection_raw(self, client_info: Connection, data): 1822 if client_info.conn.existence: 1823 client_info.output_to_client.put(data) 1824 self._output_check_sockets.add(client_info.conn.result) 1825 else: 1826 if __debug__: 1827 self._log('ERROR: SEND MESSAGE TO CLIENT {}: "{}"'.format(client_info.addr.result, data)) 1828 raise Exception( 1829 'EXCEPTION: SEND MESSAGE TO CLIENT RAW: Client is disconnected! You can not send data to him!') 1830 1831 def _send_messages_through_connection_raw(self, client_info: Connection, messages_list): 1832 if client_info.conn.existence: 1833 client_info.output_to_client.extend(messages_list) 1834 self._output_check_sockets.add(client_info.conn.result) 1835 else: 1836 if __debug__: 1837 self._log( 1838 'ERROR: SEND MESSAGES TO CLIENT {}: "{}"'.format(client_info.addr.result, messages_list)) 1839 raise Exception( 1840 'EXCEPTION: SEND MESSAGES TO CLIENT RAW: Client is disconnected! You can not send data to him!') 1841 1842 def _move_message_from_fifo_to_memoryview(self, client_info: Connection): 1843 if client_info.current_output_memoryview is None: 1844 if client_info.output_to_client.size(): 1845 client_info.current_output_memoryview = memoryview(client_info.output_to_client.get()) 1846 1847 # @profile 1848 def _consolidate_and_move_messages_from_fifo_to_memoryview(self, client_info: Connection): 1849 output_fifo_size = client_info.output_to_client.size() 1850 if output_fifo_size > 1: 1851 result_data, result_size, result_qnt = \ 1852 client_info.output_to_client.get_at_least_size(524288) 1853 if result_qnt > 1: 1854 client_info.current_output_memoryview = memoryview(b''.join(result_data)) 1855 else: 1856 client_info.current_output_memoryview = memoryview(result_data.popleft()) 1857 elif output_fifo_size == 1: 1858 client_info.current_output_memoryview = memoryview(client_info.output_to_client.get()) 1859 1860 # # try: 1861 # if client_info.output_to_client.size(): 1862 # result_data, result_size, result_qnt = client_info.output_to_client.get_at_least_size(524288) 1863 # # result_data, result_size, result_qnt = client_info.output_to_client.get_at_least_size(1048576) 1864 # # result_data, result_size, result_qnt = client_info.output_to_client.get_at_least_size(5242880) 1865 # if result_qnt > 1: 1866 # client_info.current_output_memoryview = memoryview(b''.join(result_data)) 1867 # else: 1868 # client_info.current_output_memoryview = memoryview(result_data.popleft()) 1869 # # client_info.current_output_memoryview = \ 1870 # # memoryview(client_info.output_to_client.get_at_least_size(524288)[0].popleft()) 1871 # # except FIFOIsEmpty: 1872 # # pass 1873 1874 # # @profile 1875 # def _consolidate_and_move_messages_from_fifo_to_memoryview(self, client_info: Connection): 1876 # # if client_info.current_output_memoryview is None: 1877 # fifo_size = client_info.output_to_client.size() 1878 # # fifo_size = client_info.output_to_client._useful_size 1879 # if fifo_size: 1880 # # if fifo_size >= 3: 1881 # # client_info.current_output_memoryview = \ 1882 # # memoryview(b''.join(client_info.output_to_client._l[client_info.output_to_client._offset:])) 1883 # # client_info.output_to_client = copy.copy(client_info.output_to_client) 1884 # # else: 1885 # # client_info.current_output_memoryview = \ 1886 # # memoryview(client_info.output_to_client.get()) 1887 # if fifo_size >= 2: 1888 # fifo_list = client_info.output_to_client._l 1889 # offs = client_info.output_to_client._offset 1890 # fifo_size_under = get_num_pieces_with_full_size_lesser_than_with_offset( 1891 # fifo_list, 524288, offs) 1892 # if not fifo_size_under: 1893 # fifo_size_under = 1 1894 # if fifo_size_under > 1: 1895 # client_info.current_output_memoryview = memoryview( 1896 # b''.join(fifo_list[offs:offs + fifo_size_under])) 1897 # client_info.output_to_client._free(fifo_size_under) 1898 # else: 1899 # client_info.current_output_memoryview = \ 1900 # memoryview(client_info.output_to_client.get()) 1901 # else: 1902 # client_info.current_output_memoryview = \ 1903 # memoryview(client_info.output_to_client.get()) 1904 1905 # # @profile 1906 # def _consolidate_raw_messages_in_input_from_client_fifo(self, client_info: Connection): 1907 # fifo_size = client_info.input_from_client.size() 1908 # if fifo_size: 1909 # if fifo_size >= 2: 1910 # fifo_list = client_info.input_from_client._l 1911 # offs = client_info.input_from_client._offset 1912 # fifo_size_under = get_num_pieces_with_full_size_lesser_than_with_offset( 1913 # fifo_list, 3 * 1024, offs) 1914 # if not fifo_size_under: 1915 # fifo_size_under = 1 1916 # if fifo_size_under > 1: 1917 # result_message = b''.join(fifo_list[offs:offs + fifo_size_under]) 1918 # client_info.input_from_client._free(fifo_size_under) 1919 # client_info.input_from_client.put(result_message) 1920 1921 def _process_client_keyword(self, client_socket: socket.socket): 1922 connection_id = self._connection_by_conn[client_socket] 1923 connection_info = self._connections[connection_id] 1924 1925 if connection_info.input_from_client.size() >= 0: 1926 expected_client_id = None 1927 expected_client_info = None 1928 1929 this_is_super_server_client = False 1930 # if connection_info.conn.result in self._conns_of_expected_clients: 1931 # expected_client_id = self._conns_of_expected_clients[connection_info.conn.result] 1932 # expected_client_info = self._expected_clients[expected_client_id] 1933 # if ConnectionType.active_connected == expected_client_info.connection_settings.connection_type: 1934 # this_is_super_server_client = True 1935 1936 # if connection_info.connected_expected_client_id is not None: 1937 # expected_client_id = connection_info.connected_expected_client_id 1938 # expected_client_info = self._expected_clients[expected_client_id] 1939 # if ConnectionType.active_connected == expected_client_info.connection_settings.connection_type: 1940 # this_is_super_server_client = True 1941 if connection_info.connected_expected_client is not None: 1942 expected_client_info = connection_info.connected_expected_client 1943 expected_client_id = expected_client_info.id 1944 if ConnectionType.active_connected == expected_client_info.connection_settings.connection_type: 1945 this_is_super_server_client = True 1946 1947 if this_is_super_server_client: 1948 # This is connection to Super-Server. So we expect an answer like b'OK' 1949 super_server_answer__keyword_accepted = connection_info.input_from_client.get() 1950 super_server_answer__keyword_accepted = bytes(super_server_answer__keyword_accepted) 1951 if super_server_answer__keyword_accepted == self.server_answer__keyword_accepted: 1952 # Answer was acceptable 1953 self._unconfirmed_clients.remove(client_socket) 1954 self._io_iteration_result.newly_connected_expected_clients.add(expected_client_id) 1955 if expected_client_info.will_use_raw_client_connection: 1956 connection_info.this_is_raw_connection = True 1957 else: 1958 # Answer was NOT acceptable 1959 self._mark_connection_to_be_closed_immediately(connection_info) 1960 self._mark_connection_as_ready_for_deletion(connection_info) 1961 if __debug__: 1962 self._log('ERROR: SUPER SERVER ANSWER - KEYWORD WAS NOT ACCEPTED: {}'.format( 1963 super_server_answer__keyword_accepted)) 1964 else: 1965 # This is connection to client. So we expect a keyword 1966 keyword = connection_info.input_from_client.get() 1967 keyword = bytes(keyword) 1968 connection_info.keyword = keyword 1969 self._unconfirmed_clients.remove(client_socket) 1970 self._send_message_through_connection(connection_info, self.server_answer__keyword_accepted) 1971 1972 if keyword in self._keywords_for_expected_clients: 1973 # empty expected client was already registered 1974 expected_client_id = self._keywords_for_expected_clients[keyword] 1975 expected_client_info = self._expected_clients[expected_client_id] 1976 expected_client_info.connection_id = connection_id 1977 expected_client_info._Client__connection = connection_info 1978 self._conns_of_expected_clients[client_socket] = expected_client_id 1979 connection_info.connected_expected_client_id = expected_client_id 1980 connection_info.connected_expected_client = expected_client_info 1981 self._io_iteration_result.newly_connected_expected_clients.add(expected_client_id) 1982 if expected_client_info.will_use_raw_client_connection: 1983 connection_info.this_is_raw_connection = True 1984 else: 1985 # it is unknown expected client 1986 if self.unexpected_clients_are_allowed: 1987 self._add_unexpected_client(connection_id, keyword) 1988 else: 1989 self._mark_connection_to_be_closed_immediately(connection_info) 1990 self._mark_connection_as_ready_for_deletion(connection_info) 1991 1992 def _check_is_client_have_data_to_read_in_fifo(self, readable_socket: socket.socket): 1993 # connection_id = self._connection_by_conn[readable_socket] 1994 client_info = self._connections[self._connection_by_conn[readable_socket]] 1995 # if readable_socket in self._conns_of_expected_clients: 1996 # expected_client_id = self._conns_of_expected_clients[readable_socket] 1997 # if client_info.input_from_client.size(): 1998 # self._io_iteration_result.clients_have_data_to_read.add(expected_client_id) 1999 if client_info.connected_expected_client_id is not None: 2000 if client_info.input_from_client.size(): 2001 self._io_iteration_result.clients_have_data_to_read.add( 2002 client_info.connected_expected_client_id) 2003 if client_info.has_inline_processor: 2004 self._inline_processor__on__data_received(client_info) 2005 2006 def _client_have_data_to_read_in_fifo(self, readable_socket: socket.socket): 2007 if readable_socket in self._conns_of_expected_clients: 2008 expected_client_id = self._conns_of_expected_clients[readable_socket] 2009 expected_client = self._expected_clients[expected_client_id] 2010 self._io_iteration_result.clients_have_data_to_read.add(expected_client_id) 2011 connection_info = expected_client._Client__connection 2012 # connection_info = self._connections.get(expected_client.connection_id) 2013 if connection_info.has_inline_processor: 2014 self._inline_processor__on__data_received(connection_info) 2015 2016 def _inline_processor__apply_parameters(self, connection_info: Connection, 2017 expected_client: Client): 2018 inline_processor = expected_client.obj_for_inline_processing 2019 2020 inline_processor.is_in_raw_mode = inline_processor._InlineProcessor__set__is_in_raw_mode 2021 connection_info.this_is_raw_connection = inline_processor._InlineProcessor__set__is_in_raw_mode 2022 2023 if inline_processor._InlineProcessor__set__mark_socket_as_should_be_closed_immediately: 2024 inline_processor._InlineProcessor__set__mark_socket_as_should_be_closed_immediately = False 2025 self._mark_connection_to_be_closed_immediately(connection_info) 2026 2027 if inline_processor._InlineProcessor__set__mark_socket_as_ready_to_be_closed: 2028 inline_processor._InlineProcessor__set__mark_socket_as_ready_to_be_closed = False 2029 self._mark_connection_as_ready_to_be_closed(connection_info) 2030 2031 def _inline_processor__init_parameters(self, connection_info: Connection, 2032 expected_client: Client): 2033 inline_processor = expected_client.obj_for_inline_processing 2034 2035 inline_processor.is_in_raw_mode = connection_info.this_is_raw_connection 2036 inline_processor._InlineProcessor__set__is_in_raw_mode = connection_info.this_is_raw_connection 2037 2038 def _inline_processor__on__data_received(self, connection_info: Connection): 2039 expected_client = connection_info.connected_expected_client 2040 inline_processor = expected_client.obj_for_inline_processing 2041 2042 # if not callable(getattr(inline_processor, 'on__data_received', None)): 2043 # return False 2044 2045 try: 2046 while connection_info.input_from_client.size(): 2047 inline_processor.on__data_received(connection_info.input_from_client.get()) 2048 2049 if inline_processor.output_messages: 2050 while inline_processor.output_messages: 2051 another_message = inline_processor.output_messages.popleft() 2052 if not connection_info.this_is_raw_connection: 2053 connection_info.output_to_client.put( 2054 len(another_message).to_bytes(self.message_size_len, 'little')) 2055 connection_info.output_to_client.put(another_message) 2056 self._output_check_sockets.add(connection_info.conn.result) 2057 if connection_info.output_to_client.get_data_full_size() >= 65536: 2058 # self._write_data_to_socket(connection_info.conn.result) 2059 self._write_data_to_socket(connection_info) 2060 return True 2061 except: 2062 self.remove_client(expected_client.id) 2063 exc = sys.exc_info() 2064 exception = exc 2065 error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0])) 2066 formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2]) 2067 exception = exception[:2] + (formatted_traceback,) 2068 trace_str = ''.join(exception[2]) 2069 result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str) 2070 if __debug__: 2071 self._log('EXCEPTION: INLINE PROCESSOR: ON DATA RECEIVED: {}'.format(result_string)) 2072 2073 return False 2074 2075 def _inline_processor__on__output_buffers_are_empty(self, connection_info: Connection, 2076 expected_client: Client): 2077 inline_processor = expected_client.obj_for_inline_processing 2078 2079 if not connection_info.has_inline_processor: 2080 return False 2081 2082 # if not callable(getattr(inline_processor, 'on__output_buffers_are_empty', None)): 2083 # return False 2084 2085 try: 2086 inline_processor.on__output_buffers_are_empty() 2087 2088 if inline_processor.output_messages: 2089 while inline_processor.output_messages: 2090 another_message = inline_processor.output_messages.popleft() 2091 if not connection_info.this_is_raw_connection: 2092 connection_info.output_to_client.put( 2093 len(another_message).to_bytes(self.message_size_len, 'little')) 2094 connection_info.output_to_client.put(another_message) 2095 self._output_check_sockets.add(connection_info.conn.result) 2096 if connection_info.output_to_client.get_data_full_size() >= 65536: 2097 # self._write_data_to_socket(connection_info.conn.result) 2098 self._write_data_to_socket(connection_info) 2099 return True 2100 2101 # residual_data = inline_processor.output_messages.get_data_full_size() 2102 # if residual_data: 2103 # output_messages, result_size, result_qnt = \ 2104 # inline_processor.output_messages.get_at_least_size(residual_data) 2105 # if connection_info.this_is_raw_connection: 2106 # self._send_messages_through_connection_raw(connection_info, output_messages) 2107 # else: 2108 # self._send_messages_through_connection(connection_info, output_messages) 2109 # self._write_data_to_socket(connection_info.conn.result) 2110 # # self._io_iteration_result.clients_with_empty_output_fifo.remove( 2111 # # connection_info.connected_expected_client_id) 2112 # result = True 2113 2114 # self._inline_processor__apply_parameters(connection_info, expected_client) 2115 # self._clients_with_inline_processors_that_need_to_apply_parameters.add(expected_client.id) 2116 except: 2117 self.remove_client(expected_client.id) 2118 exc = sys.exc_info() 2119 exception = exc 2120 error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0])) 2121 formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2]) 2122 exception = exception[:2] + (formatted_traceback,) 2123 trace_str = ''.join(exception[2]) 2124 result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str) 2125 if __debug__: 2126 self._log( 2127 'EXCEPTION: INLINE PROCESSOR: ON OUTPUT BUFFERS ARE EMPTY: {}'.format(result_string)) 2128 2129 return False 2130 2131 def _inline_processor__on__connection_lost(self, connection_info: Connection, 2132 expected_client: Client): 2133 inline_processor = expected_client.obj_for_inline_processing 2134 2135 if not connection_info.has_inline_processor: 2136 return False 2137 2138 # if not callable(getattr(inline_processor, 'on__connection_lost', None)): 2139 # return False 2140 2141 try: 2142 inline_processor.on__connection_lost() 2143 except: 2144 exc = sys.exc_info() 2145 exception = exc 2146 error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0])) 2147 formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2]) 2148 exception = exception[:2] + (formatted_traceback,) 2149 trace_str = ''.join(exception[2]) 2150 result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str) 2151 if __debug__: 2152 self._log('EXCEPTION: INLINE PROCESSOR: ON CONNECTION LOST: {}'.format(result_string)) 2153 self.remove_client(expected_client.id) 2154 2155 def _inline_processor__on__connection_lost_by_connection_id(self, connection_id): 2156 connection_info = self._connections[connection_id] 2157 expected_client_info = connection_info.connected_expected_client 2158 if (expected_client_info is not None) and connection_info.has_inline_processor: 2159 self._inline_processor__on__connection_lost(connection_info, expected_client_info) 2160 self._io_iteration_result.clients_with_disconnected_connection.remove( 2161 expected_client_info.id) 2162 2163 def _unlink_good_af_unix_sockets(self): 2164 if 'posix' == os.name: 2165 for gate_connection_settings in self.gates_connections_settings: 2166 if gate_connection_settings.socket_family == socket.AF_UNIX: 2167 try: 2168 os.unlink(gate_connection_settings.socket_address) 2169 except: 2170 if __debug__: 2171 self._log('EXCEPTION: SERVER END: TRYING TO UNLINK GOOD AF_UNIX GATE: {}'.format( 2172 gate_connection_settings.socket_address)) 2173 raise 2174 2175 def _check_for_initial_af_unix_socket_unlink(self, connection_settings: ConnectionSettings): 2176 if 'posix' == os.name: 2177 if connection_settings.socket_family == socket.AF_UNIX: 2178 if os.path.exists(connection_settings.socket_address): 2179 # try: 2180 # os.unlink(connection_settings.socket_address) 2181 # except OSError: 2182 # if __debug__: self._log('EXCEPTION: INITIATION: GATE: CAN\'T UNLINK EXISTING AF_UNIX SOCKET: {}'.format( 2183 # connection_settings.socket_address)) 2184 # raise 2185 if __debug__: 2186 self._log('EXCEPTION: INITIATION: GATE: AF_UNIX SOCKET IS ALREADY EXIST: {}'.format( 2187 connection_settings.socket_address)) 2188 2189 2190@contextmanager 2191def asock_io_core_connect(asock_io_core_obj: ASockIOCore, should_have_gate_connections: bool = False, backlog: int = 1): 2192 try: 2193 gate_connections_num = asock_io_core_obj.listen(backlog) 2194 if should_have_gate_connections and (not gate_connections_num): 2195 error_text = 'ERROR: CONTEXTMANAGER: BASIC INITIATION: THERE IS NO GOOD GATE CONNECTIONS!' 2196 asock_io_core_obj._log(error_text) 2197 raise Exception(error_text) 2198 else: 2199 print('THERE ARE CREATED {} GOOD GATE CONNECTIONS'.format(gate_connections_num)) 2200 yield asock_io_core_obj 2201 except: 2202 raise 2203 finally: 2204 asock_io_core_obj.io_iteration() 2205 asock_io_core_obj.destroy() 2206 asock_io_core_obj.io_iteration() 2207 if __debug__: 2208 print('RECV BUFF SIZES: {}'.format(str(asock_io_core_obj.recv_buff_sizes)[:150])) 2209 if __debug__: 2210 print('RECV SIZES: {}'.format(str(asock_io_core_obj.recv_sizes)[:150])) 2211 # print('RECV BUFF SIZES: {}'.format(asock_io_core_obj.recv_buff_sizes)) 2212 # print('RECV SIZES: {}'.format(asock_io_core_obj.recv_sizes))
59class IoIterationResult: 60 """ 61 ([1] подключившиеся ожидаемые клиенты (ОКл); [2] ОКл сокет которых был отключен по причине ошибки 62 (сами ОКл еще небыли удалены - удаление нужно инициировать явно); [3] ОКл имеет очередь непрочитанных принятых 63 сообщений; [4] размер очереди неотправленных сообщений ОКл меньше порогового, а значит в нее можно записывать 64 новые запросы (не уверен пока в надобности этого параметра. Скорее всего он не нужен: актор в большинстве 65 случаев блокируется при вызове IO-операции; кроме случаев когда был задействован ассинхронный интерфейс, 66 при котором актор отправляет запрос не требуя ответа об успешном окончании операции (без какого-либо контроля 67 успешности, или же с ручным контролем путем вызова спец-метода, который-бы и проводил проверку, или же 68 считывание имеющихся результатов операций))) 69 """ 70 71 def __init__(self): 72 self.newly_connected_expected_clients = set() 73 self.newly_connected_unknown_clients = set() 74 self.clients_with_disconnected_connection = set() 75 self.clients_have_data_to_read = set() 76 self.clients_with_empty_output_fifo = set()
([1] подключившиеся ожидаемые клиенты (ОКл); [2] ОКл сокет которых был отключен по причине ошибки (сами ОКл еще небыли удалены - удаление нужно инициировать явно); [3] ОКл имеет очередь непрочитанных принятых сообщений; [4] размер очереди неотправленных сообщений ОКл меньше порогового, а значит в нее можно записывать новые запросы (не уверен пока в надобности этого параметра. Скорее всего он не нужен: актор в большинстве случаев блокируется при вызове IO-операции; кроме случаев когда был задействован ассинхронный интерфейс, при котором актор отправляет запрос не требуя ответа об успешном окончании операции (без какого-либо контроля успешности, или же с ручным контролем путем вызова спец-метода, который-бы и проводил проверку, или же считывание имеющихся результатов операций)))
79class ASockIOCore(NetIOCallbacks, ASockIOCoreMemoryManagement): 80 def __init__(self, gates_connections_settings: Set[ConnectionSettings]): 81 """ 82 Port should not be open to a external world! 83 :param gates_connections_settings: set() of ConnectionSettings() 84 :return: 85 """ 86 super(ASockIOCore, self).__init__() 87 88 self.gates_connections_settings = gates_connections_settings 89 if not self.gates_connections_settings: 90 self.gates_connections_settings = set() 91 # raise Exception('gates_connections_settings should be provided!') 92 for gates_connections_settings in self.gates_connections_settings: 93 gates_connections_settings.check() 94 self.faulty_connection_settings = set() 95 self._connection_settings_by_gate_conn = dict() 96 97 self.set_of_gate_addresses = set() 98 self._gate = set() 99 self.reuse_gate_addr = False 100 self.reuse_gate_port = False 101 102 self.message_size_len = MESSAGE_SIZE_LEN 103 self.server_answer__keyword_accepted = SERVER_ANSWER__KEYWORD_ACCEPTED 104 # self.largest_small_output_data_block_size = 1024 * 4 - self.message_size_len 105 106 self._connections = dict() # key: ID; data: Connection() 107 self._connection_by_conn = dict() # key: conn; data: ID 108 self._connections_id_gen = IDGenerator() 109 110 self._connections_marked_as_ready_to_be_closed = set() 111 self._connections_marked_to_be_closed_immediately = set() 112 self._connections_marked_as_ready_to_be_deleted = set() 113 114 self._unconfirmed_clients = set() 115 116 self._we_have_connections_for_select = False 117 self._input_check_sockets = set() 118 self._output_check_sockets = set() 119 self._exception_check_sockets = set() 120 121 # ID (GUID) и другая информация клиентов, подключение которых явно ожидается 122 self._expected_clients = dict() # key: ID; data: Client() 123 self._expected_clients_id_gen = IDGenerator() 124 self._keywords_for_expected_clients = dict() # key: keyword; data: info 125 self._conns_of_expected_clients = dict() # key: conn; data expected_client_ID 126 127 self.unexpected_clients_are_allowed = True 128 129 # Список неопознанных и неожидаемых клиентов. Если клиент выдал свой GUID и позже кто-то добавил этот GUID в 130 # список ожидаемых клиентов - данный клиент будет автоматически подхвачен. 131 self._unexpected_clients = dict() # key: ID; data: Client() 132 self._unexpected_clients_id_gen = IDGenerator() 133 self._keywords_of_unexpected_clients = dict() 134 self._conns_of_unexpected_clients = dict() 135 136 self._io_iteration_result = IoIterationResult() 137 138 self.raw_checker_for_new_incoming_connections = CheckIsRawConnection() 139 self.need_to_auto_check_incoming_raw_connection = False 140 self.unknown_clients_are_allowed = False 141 self._unknown_clients_keyword_gen = IDGenerator(GeneratorType.guid_string) 142 self.prefix_for_unknown_client_keywords = b'UNKNOWN CLIENT: ' 143 144 self.echo_log = False 145 self._internal_log = deque() 146 147 self.recv_sizes = deque() 148 self.recv_buff_sizes = deque() 149 150 self.should_get_client_addr_info_on_connection = True 151 152 self.use_nodelay_inet = False 153 self.use_speed_optimized_socket_read = False 154 155 self.show_inform_about_accept_stop_because_of_all_buffers_size_limit = \ 156 FrontTriggerableVariable(FrontTriggerableVariableType.equal, True) 157 self.show_inform_about_read_stop_because_of_in_buffer_size_limit = \ 158 FrontTriggerableVariable(FrontTriggerableVariableType.equal, True) 159 self.show_inform_about_work_stop_because_of_out_buffer_size_limit = \ 160 FrontTriggerableVariable(FrontTriggerableVariableType.equal, True) 161 162 self.class_for_unknown_clients_inline_processing = None 163 164 self._clients_with_inline_processors_that_need_to_apply_parameters = set() 165 166 # @profile 167 def io_iteration(self, timeout=0.0): 168 """ 169 170 :param timeout: timeout in seconds 171 :return: 172 """ 173 result = self._io_iteration_result 174 175 if self._we_have_connections_for_select: 176 need_to_repeat = True 177 178 while need_to_repeat: 179 readable, writable, exceptional = select.select(self._input_check_sockets, 180 self._output_check_sockets, 181 self._exception_check_sockets, 182 timeout) 183 read_is_forbidden = True 184 if (self.global_in__data_full_size.result - self.global_in__deletable_data_full_size.result) \ 185 <= self.global_in__data_size_limit.result: 186 read_is_forbidden = False 187 188 # Handle inputs 189 for s in readable: 190 read_result = self._read_data_from_socket(s) 191 if read_result: 192 # read_result = self._read_messages_from_raw_input_into_fifo(s) 193 # if read_result: 194 if s in self._unconfirmed_clients: 195 self._process_client_keyword(s) 196 self._check_is_client_have_data_to_read_in_fifo(s) 197 else: 198 self._client_have_data_to_read_in_fifo(s) 199 200 if __debug__: 201 read_is_forbidden_test = \ 202 self.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger( 203 read_is_forbidden) 204 if read_is_forbidden_test is not None: 205 if read_is_forbidden_test: 206 print('Read is suppressed until data will be processed.') 207 else: 208 print('Read is allowed: data is processed.') 209 210 # Handle outputs 211 for s in writable: 212 curr_client_info = self._connections[self._connection_by_conn[s]] 213 self._write_data_to_socket(curr_client_info) 214 # self._write_data_to_socket(s) 215 216 # Handle "exceptional conditions" 217 for s in exceptional: 218 self._handle_connection_error(s) 219 220 # Set parameters for inline processors 221 if self._clients_with_inline_processors_that_need_to_apply_parameters: 222 for ec_id in self._clients_with_inline_processors_that_need_to_apply_parameters: 223 expected_client_info = self._expected_clients[ec_id] 224 connection_info = expected_client_info._Client__connection 225 # connection_info = self._connections.get(expected_client_info.connection_id) 226 self._inline_processor__apply_parameters(connection_info, expected_client_info) 227 self._clients_with_inline_processors_that_need_to_apply_parameters.clear() 228 229 # Close sockets 230 if self._connections_marked_to_be_closed_immediately: 231 sockets_should_be_closed_immediately = self._connections_marked_to_be_closed_immediately 232 self._connections_marked_to_be_closed_immediately = set() 233 for closeable_socket in sockets_should_be_closed_immediately: 234 connection_id = self._connection_by_conn[closeable_socket] 235 # self.close_connection_by_conn(closeable_socket) 236 self.close_connection(connection_id) 237 self._inline_processor__on__connection_lost_by_connection_id(connection_id) 238 239 # Removing clients 240 if self._connections_marked_as_ready_to_be_deleted: 241 clients_ready_to_be_deleted = self._connections_marked_as_ready_to_be_deleted 242 self._connections_marked_as_ready_to_be_deleted = set() 243 for faulty_socket in clients_ready_to_be_deleted: 244 self.remove_connection_by_conn(faulty_socket) 245 246 if (self.global_out__data_full_size.result - self.global_out__deletable_data_full_size.result) \ 247 <= self.global_out__data_size_limit.result: 248 need_to_repeat = False 249 else: 250 need_to_repeat = True 251 252 need_to_repeat = False 253 254 if __debug__: 255 need_to_repeat_show = self.show_inform_about_work_stop_because_of_out_buffer_size_limit.test_trigger( 256 need_to_repeat) 257 if need_to_repeat_show is not None: 258 if need_to_repeat_show: 259 print('Work is suppressed until data will be out.') 260 else: 261 print('Work is allowed: data is out.') 262 263 self._io_iteration_result = IoIterationResult() 264 return result 265 266 def listen(self, backlog=1): 267 # backlog = backlog or 1 268 269 new_connection_settings = set() 270 for gate_connection_settings in self.gates_connections_settings: 271 gate = self.add_gate(gate_connection_settings) 272 if gate: 273 new_connection_settings.add(gate) 274 # gate = None 275 # try: 276 # try: 277 # gate = socket.socket(gate_connection_settings.socket_family, 278 # gate_connection_settings.socket_type, 279 # gate_connection_settings.socket_protocol, 280 # gate_connection_settings.socket_fileno) 281 # gate.setblocking(0) 282 # # gate.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 283 # except (socket.error, OSError) as err: 284 # gate = None 285 # if __debug__: self._log('EXCEPTION: GATE: LISTEN: CREATE SOCKET: {}, {}'.format( 286 # err.errno, err.strerror)) 287 # raise InternalNotCriticalError() 288 # 289 # if self.reuse_gate_port: 290 # gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 291 # 292 # try: 293 # self._check_for_initial_af_unix_socket_unlink(gate_connection_settings) 294 # gate.bind(gate_connection_settings.socket_address) 295 # except (socket.error, OSError) as err: 296 # gate.close() 297 # gate = None 298 # if __debug__: self._log('EXCEPTION: GATE: BIND:"{}", {}, {}'.format( 299 # gate_connection_settings.socket_address, err.errno, err.strerror)) 300 # raise InternalNotCriticalError() 301 # try: 302 # gate.listen(backlog) 303 # except (socket.error, OSError) as err: 304 # gate.close() 305 # gate = None 306 # if __debug__: self._log('EXCEPTION: GATE: LISTEN:"{}", {}, {}'.format( 307 # gate_connection_settings.socket_address, err.errno, err.strerror)) 308 # raise InternalNotCriticalError() 309 # 310 # if self.reuse_gate_addr: 311 # gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 312 # 313 # self._input_check_sockets.add(gate) 314 # self._exception_check_sockets.add(gate) 315 # except InternalNotCriticalError as err: 316 # gate = None 317 # finally: 318 # if gate: 319 # self._gate.add(gate) 320 # if gate_connection_settings.socket_family in INET_TYPE_CONNECTIONS: 321 # self.set_of_gate_addresses.add(gate_connection_settings.socket_address[0]) 322 # elif socket.AF_UNIX == gate_connection_settings.socket_family: 323 # self.set_of_gate_addresses.add(gate_connection_settings.socket_address) 324 # else: 325 # self.set_of_gate_addresses.add(gate_connection_settings.socket_address) 326 # self._log('WARNING: GATE: SAVE CONNECTION ADDRESS: UNKNOWN SOCKET FAMILY') 327 # self._connection_settings_by_gate_conn[gate] = gate_connection_settings 328 # self._we_have_connections_for_select = True 329 # new_connection_settings.add(gate_connection_settings) 330 # else: 331 # self.faulty_connection_settings.add(gate_connection_settings) 332 self.gates_connections_settings = new_connection_settings 333 334 return len(self.gates_connections_settings) 335 336 def add_gate(self, gate_connection_settings: ConnectionSettings): 337 gate_connection_settings.check() 338 339 gate = None 340 try: 341 try: 342 gate = socket.socket(gate_connection_settings.socket_family, gate_connection_settings.socket_type, 343 gate_connection_settings.socket_protocol, gate_connection_settings.socket_fileno) 344 gate.setblocking(0) 345 # gate.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 346 except (socket.error, OSError) as err: 347 gate = None 348 if __debug__: 349 self._log('EXCEPTION: GATE: LISTEN: CREATE SOCKET: {}, {}'.format( 350 err.errno, err.strerror)) 351 raise InternalNotCriticalError() 352 353 if self.reuse_gate_port: 354 gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 355 356 try: 357 self._check_for_initial_af_unix_socket_unlink(gate_connection_settings) 358 gate.bind(gate_connection_settings.socket_address) 359 except (socket.error, OSError) as err: 360 gate.close() 361 gate = None 362 if __debug__: 363 self._log('EXCEPTION: GATE: BIND:"{}", {}, {}'.format( 364 gate_connection_settings.socket_address, err.errno, err.strerror)) 365 raise InternalNotCriticalError() 366 try: 367 gate.listen(gate_connection_settings.backlog) 368 except (socket.error, OSError) as err: 369 gate.close() 370 gate = None 371 if __debug__: 372 self._log('EXCEPTION: GATE: LISTEN:"{}", {}, {}'.format( 373 gate_connection_settings.socket_address, err.errno, err.strerror)) 374 raise InternalNotCriticalError() 375 376 if self.reuse_gate_addr: 377 gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 378 379 self._input_check_sockets.add(gate) 380 self._exception_check_sockets.add(gate) 381 except InternalNotCriticalError as err: 382 gate = None 383 finally: 384 if gate: 385 self._gate.add(gate) 386 if gate_connection_settings.socket_family in INET_TYPE_CONNECTIONS: 387 self.set_of_gate_addresses.add(gate_connection_settings.socket_address[0]) 388 elif socket.AF_UNIX == gate_connection_settings.socket_family: 389 self.set_of_gate_addresses.add(gate_connection_settings.socket_address) 390 else: 391 self.set_of_gate_addresses.add(gate_connection_settings.socket_address) 392 self._log('WARNING: GATE: SAVE CONNECTION ADDRESS: UNKNOWN SOCKET FAMILY') 393 self._connection_settings_by_gate_conn[gate] = gate_connection_settings 394 self._we_have_connections_for_select = True 395 return gate_connection_settings 396 else: 397 self.faulty_connection_settings.add(gate_connection_settings) 398 return None 399 400 def close_all_connections(self): 401 if __debug__: 402 self._log('CLOSE ALL CONNECTIONS:') 403 clients_list = dict(self._connections) 404 for connection_id, client_info in clients_list.items(): 405 self.close_connection(connection_id) 406 407 def remove_all_connections(self): 408 clients_list = dict(self._connections) 409 for connection_id, client_info in clients_list.items(): 410 self.remove_connection(connection_id) 411 412 def close(self): 413 for gate in self._gate: 414 gate.close() 415 416 if gate in self._input_check_sockets: 417 self._input_check_sockets.remove(gate) 418 if gate in self._exception_check_sockets: 419 self._exception_check_sockets.remove(gate) 420 421 if not self._input_check_sockets: 422 self._we_have_connections_for_select = False 423 self._unlink_good_af_unix_sockets() 424 425 def destroy(self): 426 self.close_all_connections() 427 self.remove_all_connections() 428 self.close() 429 430 def add_client(self, expected_client_info: Client): 431 """ 432 Добавляет новый expected client в список. Это может быть как клиент (который сам подключился или подключится в 433 будущем), так и супер-сервер, попытка подключения к которому будет осуществлена тут же - на месте. 434 При этом если произойдет какая-либо ошибка при подключении к супер-серверу - expected client не будет 435 зарегистрирован. Однако client может быть создан. В случае ошибки он будет помечен для закрытия и удаления. 436 Поэтому исключения нужно перехватывать, и после этого проводить как минимум один (как минимум завершающий - 437 перед закрытием и уничтожением сервера) цикл обработки io_iteration(). 438 439 :param expected_client_info: link to Client() 440 :return: expected_client_id 441 """ 442 if (expected_client_info.connection_settings.keyword is None) \ 443 and (ConnectionType.active_accepted == expected_client_info.connection_settings.connection_type): 444 raise Exception('Keyword in Client.connection_settings should not be None for a Client connection!') 445 446 if expected_client_info.connection_settings.keyword in self._keywords_for_expected_clients: 447 raise Exception('Expected Client with keyword "{}" is already registered!'.format( 448 expected_client_info.connection_settings.keyword)) 449 450 expected_client_info.id = self._expected_clients_id_gen() 451 452 if self.unexpected_clients_are_allowed: 453 if expected_client_info.connection_settings.keyword in self._keywords_of_unexpected_clients: 454 # клиент уже подключен 455 unexpected_client_id = self._keywords_of_unexpected_clients[ 456 expected_client_info.connection_settings.keyword] 457 unexpected_client_info = self._unexpected_clients[unexpected_client_id] 458 connection_info = expected_client_info._Client__connection 459 # connection_info = self._connections.get(expected_client_info.connection_id) 460 if (unexpected_client_info.connection_settings.connection_type == 461 expected_client_info.connection_settings.connection_type) and \ 462 (ConnectionType.active_accepted == unexpected_client_info.connection_settings.connection_type): 463 # Произошел запрос на подключение к клиенту, и клиент с таким же ключевым словом уже 464 # подключен (с сервером этого быть не должно, и может произойти только при неверном ручном изменении 465 # внутренних данных объекта класса ASockIOCore). Необходимо переиспользовать уже имеющееся 466 # подключение. 467 # В случае если все же тут оказался соккет подключенный к супер-серверу - он будет автоматически 468 # отключен и соединение будет установлено в новом сокете. 469 expected_client_info.connection_id = unexpected_client_info.connection_id 470 expected_client_info._Client__connection = \ 471 unexpected_client_info._Client__connection 472 self._conns_of_expected_clients[connection_info.conn.result] = expected_client_info.id 473 connection_info.connected_expected_client_id = expected_client_info.id 474 connection_info.connected_expected_client = expected_client_info 475 self._io_iteration_result.newly_connected_expected_clients.add(expected_client_info.id) 476 else: 477 # Произошел запрос на подключение к супер-серверу, но клиент с таким же ключевым словом уже 478 # подключен (или наоборот). Или же просто имеется установленное соединение с супер-сервером. 479 # Необходимо его отключить. 480 self._mark_connection_to_be_closed_immediately(connection_info) 481 self._mark_connection_as_ready_for_deletion(connection_info) 482 self._remove_unexpected_client(unexpected_client_id) 483 else: 484 # клиент еще не подключен 485 expected_client_info.connection_id = None 486 expected_client_info._Client__connection = None 487 488 if ConnectionType.active_connected == expected_client_info.connection_settings.connection_type: 489 self._connect_to_super_server(expected_client_info) 490 491 self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword] = expected_client_info.id 492 self._expected_clients[expected_client_info.id] = expected_client_info 493 494 return expected_client_info.id 495 496 def get_client_id_by_keyword(self, expected_client_keyword): 497 """ 498 :param expected_client_keyword: expected_client_keyword 499 :return: link to Client() 500 """ 501 return self._keywords_for_expected_clients[expected_client_keyword] 502 503 def get_client_info(self, expected_client_id): 504 """ 505 :param expected_client_id: expected_client_id 506 :return: link to Client() 507 """ 508 return self._expected_clients[expected_client_id] 509 510 def get_connection_input_fifo_size_for_client(self, expected_client_id): 511 expected_client_info = self._expected_clients[expected_client_id] 512 connection_info = expected_client_info._Client__connection 513 # connection_info = self._connections.get(expected_client_info.connection_id) 514 if connection_info is None: 515 raise Exception('Expected client was not connected yet!') 516 # if client_info.this_is_raw_connection: 517 # if client_info.input_from_client.size(): 518 # return 1 519 # else: 520 # return 0 521 # else: 522 # return client_info.input_from_client.size() 523 return connection_info.input_from_client.size() 524 525 def get_message_from_client(self, expected_client_id): 526 expected_client_info = self._expected_clients[expected_client_id] 527 connection_info = expected_client_info._Client__connection 528 # connection_info = self._connections.get(expected_client_info.connection_id) 529 if connection_info is None: 530 raise Exception('Expected client was not connected yet!') 531 if not connection_info.input_from_client.size(): 532 raise Exception('There is no readable data in expected client\'s FIFO!') 533 # if client_info.this_is_raw_connection: 534 # self._consolidate_raw_messages_in_input_from_client_fifo(client_info) 535 return connection_info.input_from_client.get() 536 537 # @profile 538 def get_messages_from_client(self, expected_client_id): 539 expected_client_info = self._expected_clients[expected_client_id] 540 connection_info = expected_client_info._Client__connection 541 # connection_info = self._connections.get(expected_client_info.connection_id) 542 if connection_info is None: 543 raise Exception('Expected client was not connected yet!') 544 try: 545 while True: 546 yield connection_info.input_from_client.get() 547 except FIFOIsEmpty: 548 pass 549 # while client_info.input_from_client.size(): 550 # yield client_info.input_from_client.get() 551 552 def get_connection_output_fifo_size_for_client(self, expected_client_id): 553 expected_client_info = self._expected_clients[expected_client_id] 554 connection_info = expected_client_info._Client__connection 555 # connection_info = self._connections.get(expected_client_info.connection_id) 556 if connection_info is None: 557 raise Exception('Expected client was not connected yet!') 558 return connection_info.output_to_client.size() 559 560 def send_message_to_client(self, expected_client_id, data): 561 # data = bytes(data) 562 expected_client_info = self._expected_clients[expected_client_id] 563 connection_info = expected_client_info._Client__connection 564 # connection_info = self._connections.get(expected_client_info.connection_id) 565 if connection_info is None: 566 raise Exception('Expected client was not connected yet!') 567 if connection_info.this_is_raw_connection: 568 self._send_message_through_connection_raw(connection_info, data) 569 else: 570 self._send_message_through_connection(connection_info, data) 571 572 def send_messages_to_client(self, expected_client_id, messages_list): 573 # data = bytes(data) 574 expected_client_info = self._expected_clients[expected_client_id] 575 connection_info = expected_client_info._Client__connection 576 # connection_info = self._connections.get(expected_client_info.connection_id) 577 if connection_info is None: 578 raise Exception('Expected client was not connected yet!') 579 if connection_info.this_is_raw_connection: 580 self._send_messages_through_connection_raw(connection_info, messages_list) 581 else: 582 self._send_messages_through_connection(connection_info, messages_list) 583 584 def check_is_client_is_in_raw_connection_mode(self, expected_client_id): 585 expected_client_info = self._expected_clients[expected_client_id] 586 connection_info = expected_client_info._Client__connection 587 # connection_info = self._connections.get(expected_client_info.connection_id) 588 if connection_info is None: 589 raise Exception('Expected client was not connected yet!') 590 return connection_info.this_is_raw_connection 591 592 def switch_client_raw_connection_mode(self, expected_client_id, is_raw: bool): 593 expected_client_info = self._expected_clients[expected_client_id] 594 connection_info = expected_client_info._Client__connection 595 # connection_info = self._connections.get(expected_client_info.connection_id) 596 if connection_info is None: 597 raise Exception('Expected client was not connected yet!') 598 connection_info.this_is_raw_connection = is_raw 599 600 def set_inline_processor_for_client(self, expected_client_id, 601 class_for_unknown_clients_inline_processing: type): 602 expected_client_info = self._expected_clients[expected_client_id] 603 connection_info = expected_client_info._Client__connection 604 # connection_info = self._connections.get(expected_client_info.connection_id) 605 if connection_info is None: 606 raise Exception('Expected client was not connected yet!') 607 self._set_inline_processor_for_client(connection_info, expected_client_info, 608 class_for_unknown_clients_inline_processing) 609 610 def close_client_connection(self, expected_client_id, raise_if_already_closed=True): 611 """ 612 Connection will be closed immediately (inside this method) 613 :param expected_client_id: 614 :param raise_if_already_closed: 615 :return: 616 """ 617 if __debug__: 618 self._log('CLOSE EXPECTED CLIENT SOCKET:') 619 expected_client_info = self._expected_clients[expected_client_id] 620 connection_info = expected_client_info._Client__connection 621 # connection_info = self._connections.get(expected_client_info.connection_id) 622 if raise_if_already_closed and (connection_info is None): 623 raise Exception('Expected client was not connected yet!') 624 self.close_connection(connection_info.id) 625 626 def mark_client_connection_as_should_be_closed_immediately(self, expected_client_id, 627 raise_if_already_closed=True): 628 """ 629 Connection will be closed immediately (inside main IO loop) 630 :param expected_client_id: 631 :param raise_if_already_closed: 632 :return: 633 """ 634 if __debug__: 635 self._log('MARK EXPECTED CLIENT SOCKET AS SHOULD BE CLOSED IMMEDIATELY:') 636 expected_client_info = self._expected_clients[expected_client_id] 637 connection_info = expected_client_info._Client__connection 638 # connection_info = self._connections.get(expected_client_info.connection_id) 639 if raise_if_already_closed and (connection_info is None): 640 raise Exception('Expected client was not connected yet!') 641 self._mark_connection_to_be_closed_immediately(connection_info) 642 643 def mark_client_connection_as_ready_to_be_closed(self, expected_client_id, raise_if_already_closed=True): 644 """ 645 Connection will be closed when all output will be sent (inside main IO loop). 646 :param expected_client_id: 647 :param raise_if_already_closed: 648 :return: 649 """ 650 if __debug__: 651 self._log('MARK EXPECTED CLIENT SOCKET AS READY TO BE CLOSED:') 652 expected_client_info = self._expected_clients[expected_client_id] 653 connection_info = expected_client_info._Client__connection 654 # connection_info = self._connections.get(expected_client_info.connection_id) 655 if raise_if_already_closed and (connection_info is None): 656 raise Exception('Expected client was not connected yet!') 657 self._mark_connection_as_ready_to_be_closed(connection_info) 658 659 def remove_client(self, expected_client_id): 660 if __debug__: 661 self._log('REMOVE EXPECTED CLIENT: {}'.format(expected_client_id)) 662 expected_client_info = self._expected_clients[expected_client_id] 663 if __debug__: 664 self._log('\tWITH KEYWORD: {}'.format(expected_client_info.connection_settings.keyword)) 665 connection_id = expected_client_info.connection_id 666 if connection_id is None: 667 self._remove_client(expected_client_id) 668 self.remove_connection(connection_id) 669 670 def add_connection(self, conn, address): 671 """ 672 :param conn: socket 673 :param address: address 674 :return: client ID 675 """ 676 if conn is None: 677 raise TypeError('conn should not be None!') 678 679 conn.setblocking(0) 680 if self.use_nodelay_inet and (conn.family in INET_TYPE_CONNECTIONS): 681 conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 682 683 new_client_id = self._connections_id_gen() 684 685 client_info = Connection(new_client_id, (conn, address), self) 686 self._connections[new_client_id] = client_info 687 self._connection_by_conn[conn] = new_client_id 688 self._input_check_sockets.add(conn) 689 self._exception_check_sockets.add(conn) 690 self._we_have_connections_for_select = True 691 692 self._unconfirmed_clients.add(conn) 693 694 return new_client_id 695 696 def close_connection(self, connection_id): 697 if __debug__: 698 self._log('CLOSE CLIENT {}:'.format(connection_id)) 699 client_info = self._connections[connection_id] 700 if not client_info.conn.existence: 701 if __debug__: 702 self._log('CLIENT {} CONN IS NOT SET.'.format(connection_id)) 703 return 704 conn = client_info.conn.result 705 if conn is None: 706 if __debug__: 707 self._log('CLIENT {} CONN IS NONE.'.format(connection_id)) 708 return 709 710 conn.close() 711 client_info.conn.existence = False 712 client_info.output_to_client = copy.copy(client_info.output_to_client) # clear all output data to free some 713 # memory even before destroying 714 715 if connection_id in self._connections_marked_as_ready_to_be_closed: 716 self._connections_marked_as_ready_to_be_closed.remove(connection_id) 717 if conn in self._connections_marked_to_be_closed_immediately: 718 self._connections_marked_to_be_closed_immediately.remove(conn) 719 if conn in self._connection_by_conn: 720 del self._connection_by_conn[conn] 721 if conn in self._input_check_sockets: 722 self._input_check_sockets.remove(conn) 723 if conn in self._output_check_sockets: 724 self._output_check_sockets.remove(conn) 725 if conn in self._exception_check_sockets: 726 self._exception_check_sockets.remove(conn) 727 728 if not self._input_check_sockets: 729 self._we_have_connections_for_select = False 730 731 if __debug__: 732 self._log('CLIENT {} NORMALLY CLOSED.'.format(connection_id)) 733 734 def close_connection_by_conn(self, conn): 735 # Если conn не в списке - вылетет ошибка. Это предотвратит ошибочное закрытие незарегистрированного сокета. 736 # И мы сможем обнаружить наличие соответствующей ошибки в коде. 737 if __debug__: 738 self._log('CLOSE CLIENT BY CONN: {}'.format(repr(conn))) 739 connection_id = self._connection_by_conn[conn] 740 if __debug__: 741 self._log('\t WITH CLIENT ID: {}'.format(connection_id)) 742 self.close_connection(connection_id) 743 744 def remove_connection(self, connection_id): 745 # client should NOT be removed immediately after connection close (close_connection): 746 # code should do it by itself after reading all available input data 747 if __debug__: 748 self._log('REMOVE CLIENT: {}'.format(connection_id)) 749 client_info = self._connections[connection_id] 750 if __debug__: 751 self._log('\tWITH KEYWORD: {}'.format(client_info.keyword)) 752 if client_info.conn.existence: 753 self.close_connection(connection_id) 754 conn = client_info.conn.result 755 if conn is None: 756 return 757 758 if conn in self._conns_of_unexpected_clients: 759 self._remove_unexpected_client(self._conns_of_unexpected_clients[conn]) 760 761 # if conn in self._conns_of_expected_clients: 762 # self._remove_client(self._conns_of_expected_clients[conn]) 763 if client_info.connected_expected_client_id is not None: 764 self._remove_client(client_info.connected_expected_client_id) 765 766 client_info.connected_expected_client_id = None 767 client_info.connected_expected_client = None 768 769 if connection_id in self._connections_marked_as_ready_to_be_deleted: 770 self._connections_marked_as_ready_to_be_deleted.remove(connection_id) 771 772 client_info.conn.existence = False 773 client_info.conn.result = None 774 775 del self._connections[connection_id] 776 if conn in self._connection_by_conn: 777 del self._connection_by_conn[conn] 778 if conn in self._input_check_sockets: 779 self._input_check_sockets.remove(conn) 780 if conn in self._output_check_sockets: 781 self._output_check_sockets.remove(conn) 782 if conn in self._exception_check_sockets: 783 self._exception_check_sockets.remove(conn) 784 785 # client_info.remove() 786 787 def remove_connection_by_conn(self, conn): 788 connection_id = self._connection_by_conn[conn] 789 self.remove_connection(connection_id) 790 791 def _log(self, log_string): 792 self._internal_log.append(log_string) 793 if self.echo_log: 794 print(log_string) 795 796 def _create_unknown_client_from_connection(self, client_info: Connection): 797 keyword = None 798 keyword_is_ok = False 799 while not keyword_is_ok: 800 keyword = self.prefix_for_unknown_client_keywords + self._unknown_clients_keyword_gen().encode() 801 if keyword not in self._keywords_for_expected_clients: 802 keyword_is_ok = True 803 connection_settings = ConnectionSettings(connection_type=ConnectionType.active_accepted, keyword=keyword) 804 expected_client_info = Client(connection_settings) 805 expected_client_info.id = self._expected_clients_id_gen() 806 807 expected_client_info.connection_id = client_info.id 808 expected_client_info._Client__connection = client_info 809 expected_client_info.will_use_raw_client_connection = True 810 expected_client_info.will_use_raw_connection_without_handshake = True 811 expected_client_info.this_is_unknown_client = True 812 if self.class_for_unknown_clients_inline_processing is not None: 813 self._set_inline_processor_for_client(client_info, expected_client_info, 814 self.class_for_unknown_clients_inline_processing) 815 # assert type(self.class_for_unknown_clients_inline_processing) == type, \ 816 # '.class_for_unknown_clients_inline_processing must be a class or None' 817 # expected_client_info.obj_for_inline_processing = self.class_for_unknown_clients_inline_processing( 818 # keyword, client_info.conn.result.family, client_info.conn.result.type, client_info.conn.result.proto, 819 # copy.copy(client_info.addr_info), copy.copy(client_info.host_names) 820 # ) 821 # client_info.has_inline_processor = True 822 # self._inline_processor__init_parameters(client_info, expected_client_info) 823 824 self._conns_of_expected_clients[client_info.conn.result] = expected_client_info.id 825 client_info.connected_expected_client_id = expected_client_info.id 826 client_info.connected_expected_client = expected_client_info 827 self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword] = expected_client_info.id 828 self._expected_clients[expected_client_info.id] = expected_client_info 829 830 self._io_iteration_result.newly_connected_unknown_clients.add(expected_client_info.id) 831 832 return expected_client_info.id 833 834 def _set_inline_processor_for_client(self, connection_info: Connection, 835 expected_client_info: Client, 836 class_for_unknown_clients_inline_processing: type): 837 assert type(class_for_unknown_clients_inline_processing) == type, \ 838 '.class_for_unknown_clients_inline_processing must be a class or None' 839 keyword = expected_client_info.connection_settings.keyword 840 expected_client_info.obj_for_inline_processing = class_for_unknown_clients_inline_processing( 841 expected_client_info.id, keyword, connection_info.conn.result.family, connection_info.conn.result.type, 842 connection_info.conn.result.proto, copy.copy(connection_info.addr_info), 843 copy.copy(connection_info.host_names), self._clients_with_inline_processors_that_need_to_apply_parameters 844 ) 845 connection_info.has_inline_processor = True 846 self._inline_processor__init_parameters(connection_info, expected_client_info) 847 848 def _connect_to_super_server(self, expected_client_info: Client): 849 """ 850 Подключение происходит в блокируещем режиме (неблокирующий режим включается позже - в методе add_connection()). 851 Для реализации неблокирующего режима надо оттестировать текущий код и ввести дополнительную неблокирующую 852 логику через select/poll/epoll: 853 http://man7.org/linux/man-pages/man2/connect.2.html 854 :param expected_client_info: 855 :return: 856 """ 857 connection_settings = expected_client_info.connection_settings 858 conn = None 859 try: 860 conn = socket.socket(connection_settings.socket_family, connection_settings.socket_type, 861 connection_settings.socket_protocol, connection_settings.socket_fileno) 862 # conn.setblocking(0) 863 except (socket.error, OSError) as err: 864 if __debug__: 865 self._log('EXCEPTION: SUPER SERVER: CONNECT TO: CREATE SOCKET: {}, {}'.format( 866 err.errno, err.strerror)) 867 raise 868 869 try: 870 conn.connect(connection_settings.socket_address) 871 except (socket.error, OSError) as err: 872 conn.close() 873 if __debug__: 874 self._log('EXCEPTION: SUPER SERVER: CONNECT TO: CONNECT:"{}", {}, {}'.format( 875 connection_settings.socket_address, err.errno, err.strerror)) 876 raise 877 878 super_server_client_id = self.add_connection(conn, connection_settings.socket_address) 879 880 addr_info = host_names = None 881 try: 882 if self.should_get_client_addr_info_on_connection and (conn.family in INET_TYPE_CONNECTIONS): 883 addr_info = socket.getaddrinfo(connection_settings.socket_address[0], 884 connection_settings.socket_address[1]) 885 host_names = socket.gethostbyaddr(connection_settings.socket_address[0]) 886 except ConnectionError as err: 887 # An established connection was aborted by the software in your host machine 888 if __debug__: 889 self._log('CLOSING {}: Connection reset by peer'.format(connection_settings.socket_address)) 890 if __debug__: 891 self._log('EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format( 892 connection_settings.socket_address, err.errno, err.strerror)) 893 self._mark_connection_to_be_closed_immediately(super_server_client_id) 894 ok = False 895 except (socket.error, OSError) as err: 896 if __debug__: 897 self._log('EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format( 898 connection_settings.socket_address, err.errno, err.strerror)) 899 if err.errno in SET_OF_CONNECTION_ERRORS: 900 # An established connection was aborted by the software in your host machine 901 if __debug__: 902 self._log( 903 'CLOSING {}: Connection reset by peer'.format(connection_settings.socket_address)) 904 if __debug__: 905 self._log( 906 'EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format( 907 connection_settings.socket_address, err.errno, err.strerror)) 908 self._mark_connection_to_be_closed_immediately(super_server_client_id) 909 ok = False 910 else: 911 if 'nt' == os.name: 912 if errno.WSAECONNRESET == err.errno: 913 # An existing connection was forcibly closed by the remote host 914 if __debug__: 915 self._log( 916 'CLOSING {}: Connection reset by peer'.format(connection_settings.socket_address)) 917 if __debug__: 918 self._log( 919 'EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format( 920 connection_settings.socket_address, err.errno, err.strerror)) 921 self._mark_connection_to_be_closed_immediately(super_server_client_id) 922 ok = False 923 else: 924 raise err 925 else: 926 raise err 927 928 super_server_client_info = self._connections[super_server_client_id] 929 super_server_client_info.addr_info = addr_info 930 super_server_client_info.host_names = host_names 931 self._log_new_connection(super_server_client_info, False) 932 933 if expected_client_info.will_use_raw_connection_without_handshake: 934 # Connection is made without handshake 935 super_server_client_info.this_is_raw_connection = True 936 self._unconfirmed_clients.remove(super_server_client_info.conn.result) 937 self._io_iteration_result.newly_connected_expected_clients.add(expected_client_info.id) 938 else: 939 keyword = expected_client_info.connection_settings.keyword 940 if keyword is None: 941 keyword = self._get_own_keyword_appropriate_for_connection(conn) 942 if keyword is None: 943 # Если ключевое слово не предоставлено - клиент будет помечен для закрытия и удаления 944 self._mark_connection_to_be_closed_immediately(super_server_client_info) 945 self._mark_connection_as_ready_for_deletion(super_server_client_info) 946 raise Exception('Own keyword should be provided in connection_settings!') 947 if keyword: 948 # Если ключевое слово предоставлено (даже если оно путое типа b'' - оно будет отправлено супер-серверу). 949 self._send_message_through_connection(super_server_client_info, keyword) 950 951 expected_client_info.connection_id = super_server_client_id 952 expected_client_info._Client__connection = super_server_client_info 953 self._conns_of_expected_clients[conn] = expected_client_info.id 954 super_server_client_info.connected_expected_client_id = expected_client_info.id 955 super_server_client_info.connected_expected_client = expected_client_info 956 957 def _get_own_keyword_appropriate_for_connection(self, conn): 958 keyword = None 959 random_keyword = None 960 for gate_connection_settings in self.gates_connections_settings: 961 random_keyword = gate_connection_settings.keyword 962 if (conn.family == gate_connection_settings.socket_family) and ( 963 conn.type == gate_connection_settings.socket_type) and \ 964 (conn.proto == gate_connection_settings.socket_protocol): 965 keyword = gate_connection_settings.keyword 966 if keyword is None: 967 keyword = random_keyword 968 return keyword 969 970 def _pack_message(self, data): 971 return len(data).to_bytes(self.message_size_len, 'little') + data 972 973 def _remove_client(self, expected_client_id): 974 expected_client_info = self._expected_clients[expected_client_id] 975 del self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword] 976 del self._expected_clients[expected_client_id] 977 connection_info = expected_client_info._Client__connection 978 # connection_info = self._connections.get(expected_client_info.connection_id) 979 if connection_info is not None: 980 # you can remove expected client before it will be connected to the server 981 del self._conns_of_expected_clients[connection_info.conn.result] 982 expected_client_info.connection_id = None 983 expected_client_info._Client__connection = None 984 985 def _add_unexpected_client(self, connection_id, keyword): 986 connection_settings = ConnectionSettings(connection_type=ConnectionType.active_accepted, keyword=keyword) 987 unexpected_client_info = Client(connection_settings, self._unexpected_clients_id_gen(), 988 connection_id) 989 990 self._unexpected_clients[unexpected_client_info.id] = unexpected_client_info 991 self._keywords_of_unexpected_clients[keyword] = unexpected_client_info.id 992 # connection_info = self._connections[unexpected_client_info.connection_id] 993 connection_info = unexpected_client_info._Client__connection 994 # connection_info = self._connections.get(unexpected_client_info.connection_id) 995 self._conns_of_unexpected_clients[connection_info.conn.result] = unexpected_client_info.id 996 997 return unexpected_client_info.id 998 999 def _remove_unexpected_client(self, unexpected_client_id): 1000 unexpected_client_info = self._unexpected_clients[unexpected_client_id] 1001 # connection_info = self._connections[unexpected_client_info.connection_id] # unexpected client appears only after 1002 # connection 1003 connection_info = unexpected_client_info._Client__connection # unexpected client appears only after 1004 # connection 1005 # connection_info = self._connections.get(unexpected_client_info.connection_id) # unexpected client appears 1006 # # only after connection 1007 del self._keywords_of_unexpected_clients[unexpected_client_info.connection_settings.keyword] 1008 del self._unexpected_clients[unexpected_client_id] 1009 del self._conns_of_unexpected_clients[connection_info.conn.result] 1010 unexpected_client_info.connection_id = None 1011 unexpected_client_info._Client__connection = None 1012 1013 # @profile 1014 def _accept_new_connection(self, readable_socket: socket.socket): 1015 # One of a "readable" self._gate sockets is ready to accept a connection 1016 ok = True 1017 while ok: 1018 connection_id = None 1019 client_info = None 1020 1021 connection = None 1022 client_address = None 1023 try: 1024 connection, client_address = readable_socket.accept() 1025 connection_id = self.add_connection(connection, client_address) 1026 client_info = self._connections[connection_id] 1027 except BlockingIOError as err: 1028 ok = False 1029 except InterruptedError as err: 1030 pass 1031 except (socket.error, OSError) as err: 1032 if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno): 1033 ok = False 1034 elif errno.EINTR == err.errno: 1035 pass 1036 elif errno.ECONNABORTED == err.errno: 1037 ok = False 1038 elif errno.EMFILE == err.errno: 1039 if __debug__: 1040 self._log( 1041 'The per-process limit on the number of open file descriptors had been reached.') 1042 ok = False 1043 elif errno.ENFILE == err.errno: 1044 if __debug__: 1045 self._log( 1046 'The system-wide limit on the total number of open files had been reached.') 1047 ok = False 1048 elif (errno.ENOBUFS == err.errno) or (errno.ENOMEM == err.errno): 1049 if __debug__: 1050 self._log( 1051 'Not enough free memory. Allocation is limited by the socket buffer limits.') 1052 ok = False 1053 elif errno.EPROTO == err.errno: 1054 if __debug__: 1055 self._log('Protocol error.') 1056 ok = False 1057 elif errno.EPERM == err.errno: 1058 if __debug__: 1059 self._log('Firewall rules forbid connection.') 1060 ok = False 1061 else: 1062 raise err 1063 1064 if (connection is not None) and (client_info is not None): 1065 addr_info = host_names = None 1066 try: 1067 if self.should_get_client_addr_info_on_connection and \ 1068 (connection.family in INET_TYPE_CONNECTIONS): 1069 addr_info = socket.getaddrinfo(client_address[0], client_address[1]) 1070 host_names = socket.gethostbyaddr(client_address[0]) 1071 except ConnectionError as err: 1072 # An established connection was aborted by the software in your host machine 1073 if __debug__: 1074 self._log('CLOSING {}: Connection reset by peer'.format(client_address)) 1075 if __debug__: 1076 self._log('EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1077 client_address, err.errno, err.strerror)) 1078 self._mark_connection_to_be_closed_immediately(client_info) 1079 ok = False 1080 except (socket.error, OSError) as err: 1081 if __debug__: 1082 self._log('EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1083 client_address, err.errno, err.strerror)) 1084 if err.errno in SET_OF_CONNECTION_ERRORS: 1085 # An established connection was aborted by the software in your host machine 1086 if __debug__: 1087 self._log('CLOSING {}: Connection reset by peer'.format(client_address)) 1088 if __debug__: 1089 self._log('EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1090 client_address, err.errno, err.strerror)) 1091 self._mark_connection_to_be_closed_immediately(client_info) 1092 ok = False 1093 else: 1094 if 'nt' == os.name: 1095 if errno.WSAECONNRESET == err.errno: 1096 # An existing connection was forcibly closed by the remote host 1097 if __debug__: 1098 self._log( 1099 'CLOSING {}: Connection reset by peer'.format(client_address)) 1100 if __debug__: 1101 self._log( 1102 'EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1103 client_address, err.errno, err.strerror)) 1104 self._mark_connection_to_be_closed_immediately(client_info) 1105 ok = False 1106 else: 1107 raise err 1108 else: 1109 raise err 1110 1111 client_info.addr_info = addr_info 1112 client_info.host_names = host_names 1113 self._log_new_connection(client_info, True) 1114 1115 if self.need_to_auto_check_incoming_raw_connection \ 1116 and self.raw_checker_for_new_incoming_connections(self, client_info): 1117 # Is should be RAW: 1118 if self.unknown_clients_are_allowed: 1119 # Unknown clients are allowed - create Unknown Expected Client for this connection 1120 client_info.this_is_raw_connection = True 1121 self._unconfirmed_clients.remove(client_info.conn.result) 1122 self._create_unknown_client_from_connection(client_info) 1123 else: 1124 # Unknown clients are Non allowed - close connection. 1125 if __debug__: 1126 self._log( 1127 'UNKNOWN CLIENT {} WILL BE CLOSED: UNKNOWN CLIENTS ARE NOT ALLOWED'.format( 1128 client_info.addr.result 1129 )) 1130 self._mark_connection_to_be_closed_immediately(client_info) 1131 1132 # @profile 1133 def _read_data_from_already_connected_socket__inner__memory_optimized(self, connection_info: Connection, 1134 possible_messages_in_client_input_fifo=False): 1135 ok = True 1136 1137 data = connection_info.conn.result.recv(connection_info.recv_buff_size) 1138 data_len = len(data) 1139 # self.recv_buff_sizes.append(connection_info.recv_buff_size) 1140 # self.recv_sizes.append(data_len) 1141 connection_info.calc_new_recv_buff_size(data_len) 1142 # if __debug__: self._log('received "{}" from {}'.format( 1143 # data, connection_info.addr.result)) 1144 if data: 1145 # if data_len > 4096: 1146 # data = memoryview(data) 1147 data = memoryview(data) 1148 if connection_info.this_is_raw_connection: 1149 connection_info.input_from_client.put(data) 1150 possible_messages_in_client_input_fifo = True 1151 else: 1152 connection_info.raw_input_from_client.add_piece_of_data(data) 1153 # possible_messages_in_client_input_fifo = True 1154 possible_messages_in_client_input_fifo = self._read_messages_from_raw_input_into_fifo( 1155 connection_info) 1156 else: 1157 # Interpret empty result as closed connection 1158 if __debug__: 1159 self._log( 1160 'CLOSING {} after reading no data:'.format(connection_info.addr.result)) 1161 self._mark_connection_to_be_closed_immediately(connection_info) 1162 ok = False 1163 1164 result = (ok, possible_messages_in_client_input_fifo) 1165 return result 1166 1167 # @profile 1168 def _read_data_from_already_connected_socket__inner__speed_optimized(self, connection_info: Connection, 1169 possible_messages_in_client_input_fifo=False): 1170 # print(current_line()) 1171 ok = True 1172 1173 if connection_info.current_input_memoryview: 1174 # print(current_line()) 1175 nbytes = connection_info.conn.result.recv_into(connection_info.current_input_memoryview) 1176 # print('len(connection_info.current_input_memoryview):', len(connection_info.current_input_memoryview)) 1177 # print('nbytes', nbytes) 1178 # self.recv_buff_sizes.append(len(connection_info.current_input_memoryview)) 1179 # self.recv_sizes.append(nbytes) 1180 if nbytes > 0: 1181 # print(current_line()) 1182 # data = bytes(connection_info.current_input_memoryview[:nbytes]) 1183 # res_data = bytes(connection_info.current_input_memoryview[nbytes:]) 1184 data = connection_info.current_input_memoryview[:nbytes] 1185 connection_info.current_input_memoryview = connection_info.current_input_memoryview[nbytes:] 1186 1187 if connection_info.this_is_raw_connection: 1188 # print(current_line()) 1189 connection_info.input_from_client.put(data) 1190 possible_messages_in_client_input_fifo = True 1191 else: 1192 # print(current_line()) 1193 connection_info.raw_input_from_client.add_piece_of_data(data) 1194 # possible_messages_in_client_input_fifo = True 1195 possible_messages_in_client_input_fifo = self._read_messages_from_raw_input_into_fifo( 1196 connection_info) 1197 else: 1198 # print(current_line()) 1199 # Interpret empty result as closed connection 1200 if __debug__: 1201 self._log( 1202 'CLOSING {} after reading no data:'.format(connection_info.addr.result)) 1203 self._mark_connection_to_be_closed_immediately(connection_info) 1204 ok = False 1205 else: 1206 connection_info.current_input_memoryview = memoryview(bytearray(self.socket_read_fixed_buffer_size.result)) 1207 1208 result = (ok, possible_messages_in_client_input_fifo) 1209 return result 1210 1211 # @profile 1212 def _read_data_from_already_connected_socket__inner__speed_optimized_for_messages( 1213 self, connection_info: Connection, possible_messages_in_client_input_fifo=False): 1214 ok = True 1215 1216 if connection_info.current_input_memoryview: 1217 nbytes = connection_info.conn.result.recv_into(connection_info.current_input_memoryview) 1218 if nbytes > 0: 1219 # data = connection_info.current_input_memoryview[:nbytes] 1220 connection_info.current_input_memoryview = connection_info.current_input_memoryview[nbytes:] 1221 connection_info.current_input_memoryview_nbytes += nbytes 1222 connection_info.current_input_memoryview_diff += nbytes 1223 1224 if connection_info.this_is_raw_connection: 1225 data = connection_info.current_input_memoryview[:nbytes] 1226 connection_info.input_from_client.put(data) 1227 possible_messages_in_client_input_fifo = True 1228 else: 1229 # connection_info.raw_input_from_client.add_piece_of_data(data) 1230 # possible_messages_in_client_input_fifo = self._read_messages_from_raw_input_into_fifo( 1231 # connection_info) 1232 possible_messages_in_client_input_fifo = self._read_messages_from_input_memoryview_into_raw_input( 1233 connection_info) 1234 # if not connection_info.current_input_memoryview: 1235 # connection_info.raw_input_from_client.add_piece_of_data( 1236 # memoryview(connection_info.current_input_memoryview.obj)[ 1237 # connection_info.current_input_memoryview_offset:]) 1238 # # Закоментировано, т.к. будет вызвано при следующем чтении (см. код ниже): 1239 # # connection_info.current_input_memoryview = \ 1240 # # memoryview(bytearray(self.socket_read_fixed_buffer_size.result)) 1241 # # connection_info.current_input_memoryview_offset = 0 1242 # # connection_info.current_input_memoryview_nbytes = 0 1243 # # connection_info.current_input_memoryview_diff = 0 1244 else: 1245 if __debug__: 1246 self._log( 1247 'CLOSING {} after reading no data:'.format(connection_info.addr.result)) 1248 self._mark_connection_to_be_closed_immediately(connection_info) 1249 ok = False 1250 else: 1251 connection_info.current_input_memoryview = memoryview(bytearray(self.socket_read_fixed_buffer_size.result)) 1252 connection_info.current_input_memoryview_offset = 0 1253 connection_info.current_input_memoryview_nbytes = 0 1254 connection_info.current_input_memoryview_diff = 0 1255 1256 result = (ok, possible_messages_in_client_input_fifo) 1257 return result 1258 1259 # @profile 1260 def _read_data_from_already_connected_socket__shell(self, readable_socket: socket.socket): 1261 # print(current_line()) 1262 possible_messages_in_client_input_fifo = False 1263 1264 curr_client_id = self._connection_by_conn[readable_socket] 1265 connection_info = self._connections[curr_client_id] 1266 1267 ok = True 1268 while ok: 1269 # print(current_line()) 1270 try: 1271 if self.use_speed_optimized_socket_read: 1272 ok, possible_messages_in_client_input_fifo = \ 1273 self._read_data_from_already_connected_socket__inner__speed_optimized( 1274 connection_info, possible_messages_in_client_input_fifo) 1275 else: 1276 ok, possible_messages_in_client_input_fifo = \ 1277 self._read_data_from_already_connected_socket__inner__memory_optimized( 1278 connection_info, possible_messages_in_client_input_fifo) 1279 break # makes IO faster on 10-30% on all modes (message/raw, data/http, 1280 # static_read_buffer/non_static_read_buffer). 1281 # Ускорение происходит за счет того, что при таком разбиении ввод и вывод начинают происходить (на 1282 # уровне ОС) одновременно. Т.е. мы взяли из входного буфера порцию данных и пустили в обработку. В это 1283 # время ввод на уровне ОС продолжается: ОС заполняет опустевшие буферы. После обработки мы пускаем 1284 # данные на отправку и забираем очередную порцию данных из входного буфера. В это время происходит 1285 # отправка данных из буферов на уровне ОС. И т.д. 1286 except BlockingIOError as err: 1287 # print(current_line()) 1288 ok = False 1289 except InterruptedError as err: 1290 # print(current_line()) 1291 pass 1292 except ConnectionError as err: 1293 # print(current_line()) 1294 # An established connection was aborted by the software in your host machine 1295 if __debug__: 1296 self._log('CLOSING {}: Connection reset by peer'.format(connection_info.addr.result)) 1297 if __debug__: 1298 self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1299 connection_info.addr.result, err.errno, err.strerror)) 1300 self._mark_connection_to_be_closed_immediately(connection_info) 1301 ok = False 1302 except (socket.error, OSError) as err: 1303 # print(current_line()) 1304 if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno): 1305 # print(current_line()) 1306 ok = False 1307 elif errno.EINTR == err.errno: 1308 # print(current_line()) 1309 pass 1310 elif err.errno in SET_OF_CONNECTION_ERRORS: 1311 # print(current_line()) 1312 # An established connection was aborted by the software in your host machine 1313 if __debug__: 1314 self._log('CLOSING {}: Connection reset by peer'.format(connection_info.addr.result)) 1315 if __debug__: 1316 self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1317 connection_info.addr.result, err.errno, err.strerror)) 1318 self._mark_connection_to_be_closed_immediately(connection_info) 1319 ok = False 1320 else: 1321 # print(current_line()) 1322 if 'nt' == os.name: 1323 # print(current_line()) 1324 if errno.WSAECONNRESET == err.errno: 1325 # print(current_line()) 1326 # An existing connection was forcibly closed by the remote host 1327 if __debug__: 1328 self._log( 1329 'CLOSING {}: Connection reset by peer'.format(connection_info.addr.result)) 1330 if __debug__: 1331 self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1332 connection_info.addr.result, err.errno, err.strerror)) 1333 self._mark_connection_to_be_closed_immediately(connection_info) 1334 ok = False 1335 else: 1336 # print(current_line()) 1337 raise err 1338 else: 1339 # print(current_line()) 1340 raise err 1341 1342 read_is_forbidden = False 1343 if (self.global_in__data_full_size.result - self.global_in__deletable_data_full_size.result) \ 1344 > self.global_in__data_size_limit.result: 1345 read_is_forbidden = True 1346 ok = False 1347 1348 read_is_forbidden_test = self.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger( 1349 read_is_forbidden) 1350 if read_is_forbidden_test is not None: 1351 if read_is_forbidden_test: 1352 print('Read is suppressed until data will be processed.') 1353 else: 1354 print('Read is allowed: data is processed.') 1355 1356 return possible_messages_in_client_input_fifo 1357 1358 # @profile 1359 def _read_data_from_socket(self, readable_socket: socket.socket): 1360 possible_messages_in_client_input_fifo = False 1361 if readable_socket in self._gate: 1362 accept_is_forbidden = True 1363 if (self.global_in__data_full_size.result + self.global_out__data_full_size.result) \ 1364 <= self.global__data_size_limit.result: 1365 accept_is_forbidden = False 1366 self._accept_new_connection(readable_socket) 1367 1368 if __debug__: 1369 accept_is_forbidden_test = \ 1370 self.show_inform_about_accept_stop_because_of_all_buffers_size_limit.test_trigger( 1371 accept_is_forbidden) 1372 if accept_is_forbidden_test is not None: 1373 if accept_is_forbidden_test: 1374 print('Accept is suppressed until data will be processed and/or out.') 1375 else: 1376 print('Accept is allowed: data is processed and/or out.') 1377 else: 1378 possible_messages_in_client_input_fifo = self._read_data_from_already_connected_socket__shell( 1379 readable_socket) 1380 1381 return possible_messages_in_client_input_fifo 1382 1383 # def _log_new_connection(self, connection, client_address, is_incoming_connection): 1384 def _log_new_connection(self, client_info: Connection, is_incoming_connection): 1385 connection = client_info.conn.result 1386 client_address = client_info.addr.result 1387 addr_info = client_info.addr_info 1388 host_names = client_info.host_names 1389 1390 connection_type_string = 'OUTGOING' 1391 from_to = 'to' 1392 if is_incoming_connection: 1393 connection_type_string = 'INCOMING' 1394 from_to = 'from' 1395 1396 if __debug__: 1397 self._log('New {} connection {} {}'.format(connection_type_string, from_to, client_address)) 1398 if connection.family in {socket.AF_INET, socket.AF_INET6}: 1399 if __debug__: 1400 self._log('\taddr_info: {}'.format(addr_info)) 1401 if __debug__: 1402 self._log('\thost_names: {}'.format(host_names)) 1403 1404 # # @profile 1405 # def _write_data_to_socket(self, writable_socket: socket.socket): 1406 # # Data read from already connected client 1407 # # curr_client_id = self._connection_by_conn[writable_socket] 1408 # # curr_client_info = self._connections[curr_client_id] 1409 # curr_client_info = self._connections[self._connection_by_conn[writable_socket]] 1410 # if curr_client_info.should_be_closed: 1411 # curr_client_info.current_output_memoryview = None 1412 # curr_client_info.output_to_client = copy.copy(curr_client_info.output_to_client) 1413 # return 1414 # 1415 # ok = True 1416 # first_pass = True 1417 # can_call__inline_processor__on__output_buffers_are_empty = True 1418 # while ok: 1419 # try: 1420 # if curr_client_info.current_output_memoryview: 1421 # nsent = writable_socket.send(curr_client_info.current_output_memoryview) 1422 # # if __debug__: self._log('sending "{}" to {}'.format( 1423 # # curr_client_info.current_output_memoryview[:nsent], curr_client_info.addr.result)) 1424 # curr_client_info.current_output_memoryview = curr_client_info.current_output_memoryview[nsent:] 1425 # else: 1426 # curr_client_info.current_output_memoryview = None 1427 # # self._move_message_from_fifo_to_memoryview(curr_client_info) 1428 # # self._consolidate_and_move_messages_from_fifo_to_memoryview(curr_client_info) 1429 # output_fifo_size = curr_client_info.output_to_client.size() 1430 # if output_fifo_size > 1: 1431 # result_data, result_size, result_qnt = \ 1432 # curr_client_info.output_to_client.get_at_least_size(524288) 1433 # if result_qnt > 1: 1434 # curr_client_info.current_output_memoryview = memoryview(b''.join(result_data)) 1435 # else: 1436 # curr_client_info.current_output_memoryview = memoryview(result_data.popleft()) 1437 # elif output_fifo_size == 1: 1438 # curr_client_info.current_output_memoryview = memoryview(curr_client_info.output_to_client.get()) 1439 # 1440 # if curr_client_info.current_output_memoryview is None: 1441 # if not curr_client_info.ready_to_be_closed: 1442 # # Если соединение помечено как "Готово к закрытию" - то нам надо дождаться момента когда 1443 # # данные будут отправлены, и только в этот момент закрывать соединение. Поэтому надо 1444 # # сохранить сокет в списке проверяемых для отправки. 1445 # self._output_check_sockets.remove(writable_socket) 1446 # if first_pass and curr_client_info.ready_to_be_closed: 1447 # # Т.е. если данных на отправку небыло даже при первом проходе цикла - т.е. изначально. 1448 # # Это значит что все данные были отправлены, и можно закрывать соединение. 1449 # self._output_check_sockets.remove(writable_socket) 1450 # self._mark_connection_to_be_closed_immediately(curr_client_info) 1451 # # if curr_client_info.ready_to_be_closed: 1452 # # self._mark_connection_to_be_closed_immediately(curr_client_info) 1453 # # if writable_socket in self._conns_of_expected_clients: 1454 # # expected_client_id = self._conns_of_expected_clients[writable_socket] 1455 # # self._io_iteration_result.clients_with_empty_output_fifo.add(expected_client_id) 1456 # ok = False 1457 # if curr_client_info.connected_expected_client_id is not None: 1458 # self._io_iteration_result.clients_with_empty_output_fifo.add( 1459 # curr_client_info.connected_expected_client_id) 1460 # if curr_client_info.has_inline_processor: 1461 # if can_call__inline_processor__on__output_buffers_are_empty: 1462 # if self._inline_processor__on__output_buffers_are_empty( 1463 # curr_client_info, curr_client_info.connected_expected_client): 1464 # ok = True 1465 # can_call__inline_processor__on__output_buffers_are_empty = False 1466 # except BlockingIOError as err: 1467 # ok = False 1468 # except InterruptedError as err: 1469 # pass 1470 # except ConnectionError as err: 1471 # # An established connection was aborted by the software in your host machine 1472 # if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result)) 1473 # if __debug__: self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format( 1474 # curr_client_info.addr.result, err.errno, err.strerror)) 1475 # self._mark_connection_to_be_closed_immediately(curr_client_info) 1476 # ok = False 1477 # except (socket.error, OSError) as err: 1478 # if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno): 1479 # ok = False 1480 # elif errno.EINTR == err.errno: 1481 # pass 1482 # elif err.errno in SET_OF_CONNECTION_ERRORS: 1483 # # Connection reset by peer 1484 # if __debug__: self._log('CLOSING {}: Connection reset by peer ({})'.format(curr_client_info.addr.result, 1485 # err.strerror)) 1486 # if __debug__: self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format( 1487 # curr_client_info.addr.result, err.errno, err.strerror)) 1488 # self._mark_connection_to_be_closed_immediately(curr_client_info) 1489 # ok = False 1490 # else: 1491 # if 'nt' == os.name: 1492 # if errno.WSAECONNRESET == err.errno: 1493 # # An existing connection was forcibly closed by the remote host 1494 # if __debug__: self._log( 1495 # 'CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result)) 1496 # if __debug__: self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format( 1497 # curr_client_info.addr.result, err.errno, err.strerror)) 1498 # self._mark_connection_to_be_closed_immediately(curr_client_info) 1499 # ok = False 1500 # else: 1501 # raise err 1502 # else: 1503 # raise err 1504 # first_pass = False 1505 1506 # @profile 1507 def _write_data_to_socket(self, curr_client_info: Connection): 1508 # CAUTION: code here is optimized for speed - not for readability or beauty. 1509 1510 expected_client_info = curr_client_info.connected_expected_client 1511 writable_socket = curr_client_info.conn.result 1512 if curr_client_info.should_be_closed: 1513 curr_client_info.current_output_memoryview = None 1514 curr_client_info.output_to_client = copy.copy(curr_client_info.output_to_client) 1515 return 1516 1517 ok = True 1518 first_pass = True 1519 can_call__inline_processor__on__output_buffers_are_empty = True 1520 while ok: 1521 try: 1522 if curr_client_info.current_output_memoryview: 1523 nsent = writable_socket.send(curr_client_info.current_output_memoryview) 1524 # if __debug__: self._log('sending "{}" to {}'.format( 1525 # curr_client_info.current_output_memoryview[:nsent], curr_client_info.addr.result)) 1526 curr_client_info.current_output_memoryview = curr_client_info.current_output_memoryview[nsent:] 1527 else: 1528 curr_client_info.current_output_memoryview = None 1529 # self._move_message_from_fifo_to_memoryview(curr_client_info) 1530 # self._consolidate_and_move_messages_from_fifo_to_memoryview(curr_client_info) 1531 output_fifo_size = curr_client_info.output_to_client.size() 1532 if output_fifo_size > 1: 1533 result_data, result_size, result_qnt = \ 1534 curr_client_info.output_to_client.get_at_least_size(524288) 1535 if result_qnt > 1: 1536 curr_client_info.current_output_memoryview = memoryview(b''.join(result_data)) 1537 else: 1538 curr_client_info.current_output_memoryview = memoryview(result_data.popleft()) 1539 elif output_fifo_size == 1: 1540 curr_client_info.current_output_memoryview = memoryview(curr_client_info.output_to_client.get()) 1541 1542 if curr_client_info.current_output_memoryview is None: 1543 if not curr_client_info.ready_to_be_closed: 1544 # Если соединение помечено как "Готово к закрытию" - то нам надо дождаться момента когда 1545 # данные будут отправлены, и только в этот момент закрывать соединение. Поэтому надо 1546 # сохранить сокет в списке проверяемых для отправки. 1547 self._output_check_sockets.remove(writable_socket) 1548 if first_pass and curr_client_info.ready_to_be_closed: 1549 # Т.е. если данных на отправку небыло даже при первом проходе цикла - т.е. изначально. 1550 # Это значит что все данные были отправлены, и можно закрывать соединение. 1551 self._output_check_sockets.remove(writable_socket) 1552 self._mark_connection_to_be_closed_immediately(curr_client_info) 1553 # if curr_client_info.ready_to_be_closed: 1554 # self._mark_connection_to_be_closed_immediately(curr_client_info) 1555 # if writable_socket in self._conns_of_expected_clients: 1556 # expected_client_id = self._conns_of_expected_clients[writable_socket] 1557 # self._io_iteration_result.clients_with_empty_output_fifo.add(expected_client_id) 1558 ok = False 1559 if expected_client_info: 1560 self._io_iteration_result.clients_with_empty_output_fifo.add( 1561 curr_client_info.connected_expected_client_id) 1562 if curr_client_info.has_inline_processor: 1563 if can_call__inline_processor__on__output_buffers_are_empty: 1564 if self._inline_processor__on__output_buffers_are_empty(curr_client_info, 1565 expected_client_info): 1566 ok = True 1567 can_call__inline_processor__on__output_buffers_are_empty = False 1568 except BlockingIOError as err: 1569 ok = False 1570 except InterruptedError as err: 1571 pass 1572 except ConnectionError as err: 1573 # An established connection was aborted by the software in your host machine 1574 if __debug__: 1575 self._log('CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result)) 1576 if __debug__: 1577 self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format( 1578 curr_client_info.addr.result, err.errno, err.strerror)) 1579 self._mark_connection_to_be_closed_immediately(curr_client_info) 1580 ok = False 1581 except (socket.error, OSError) as err: 1582 if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno): 1583 ok = False 1584 elif errno.EINTR == err.errno: 1585 pass 1586 elif err.errno in SET_OF_CONNECTION_ERRORS: 1587 # Connection reset by peer 1588 if __debug__: 1589 self._log( 1590 'CLOSING {}: Connection reset by peer ({})'.format(curr_client_info.addr.result, 1591 err.strerror)) 1592 if __debug__: 1593 self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format( 1594 curr_client_info.addr.result, err.errno, err.strerror)) 1595 self._mark_connection_to_be_closed_immediately(curr_client_info) 1596 ok = False 1597 else: 1598 if 'nt' == os.name: 1599 if errno.WSAECONNRESET == err.errno: 1600 # An existing connection was forcibly closed by the remote host 1601 if __debug__: 1602 self._log( 1603 'CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result)) 1604 if __debug__: 1605 self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format( 1606 curr_client_info.addr.result, err.errno, err.strerror)) 1607 self._mark_connection_to_be_closed_immediately(curr_client_info) 1608 ok = False 1609 else: 1610 raise err 1611 else: 1612 raise err 1613 first_pass = False 1614 1615 def _mark_connection_to_be_closed_immediately(self, client_info: Connection): 1616 client_info.should_be_closed = True 1617 self._connections_marked_to_be_closed_immediately.add(client_info.conn.result) 1618 # if client_info.conn.result in self._conns_of_expected_clients: 1619 # expected_client_id = self._conns_of_expected_clients[client_info.conn.result] 1620 # self._io_iteration_result.clients_with_disconnected_connection.add(expected_client_id) 1621 if client_info.connected_expected_client_id is not None: 1622 self._io_iteration_result.clients_with_disconnected_connection.add( 1623 client_info.connected_expected_client_id) 1624 1625 def _mark_connection_as_ready_to_be_closed(self, client_info: Connection): 1626 client_info.ready_to_be_closed = True 1627 1628 def _mark_connection_as_ready_for_deletion(self, client_info: Connection): 1629 client_info.ready_for_deletion = True 1630 self._connections_marked_as_ready_to_be_deleted.add(client_info.conn.result) 1631 1632 def _handle_connection_error(self, writable_socket: socket.socket): 1633 # Data read from already connected client 1634 curr_client_id = self._connection_by_conn[writable_socket] 1635 curr_client_info = self._connections[curr_client_id] 1636 if curr_client_info.should_be_closed: 1637 return 1638 if __debug__: 1639 self._log('handling exceptional condition for {}'.format(curr_client_info.addr.result)) 1640 self._mark_connection_to_be_closed_immediately(curr_client_info) 1641 1642 # # @profile 1643 # def _read_messages_from_raw_input_into_fifo__cheat(self, readable_socket: socket.socket): 1644 # result = False 1645 # curr_client_id = self._connection_by_conn[readable_socket] 1646 # curr_client_info = self._connections[curr_client_id] 1647 # 1648 # for index in range(curr_client_info.raw_input_from_client.qnt()): 1649 # curr_client_info.input_from_client.put(curr_client_info.raw_input_from_client._data.popleft()) 1650 # result = True 1651 # # for piece in curr_client_info.raw_input_from_client._data: 1652 # # curr_client_info.input_from_client.put(piece) 1653 # # result = True 1654 # 1655 # if result: 1656 # # curr_client_info.raw_input_from_client = DynamicListOfPieces( 1657 # # curr_client_info.raw_input_from_client._joiner, curr_client_info.raw_input_from_client._offset_limit) 1658 # curr_client_info.raw_input_from_client = copy.copy(curr_client_info.raw_input_from_client) 1659 # 1660 # return result 1661 1662 # # @profile 1663 # def _read_messages_from_raw_input_into_fifo(self, readable_socket: socket.socket): 1664 # result = False 1665 # curr_client_id = self._connection_by_conn[readable_socket] 1666 # curr_client_info = self._connections[curr_client_id] 1667 # 1668 # if curr_client_info.this_is_raw_connection: 1669 # if curr_client_info.input_from_client.size(): 1670 # result = True 1671 # return result 1672 # 1673 # while True: 1674 # if curr_client_info.current_message_length is None: 1675 # current_message_length = curr_client_info.raw_input_from_client.read_data(self.message_size_len) 1676 # if current_message_length is not None: 1677 # curr_client_info.current_message_length = int.from_bytes(current_message_length, 'little') 1678 # if curr_client_info.current_message_length is not None: 1679 # current_message = curr_client_info.raw_input_from_client.get_data( 1680 # self.message_size_len + curr_client_info.current_message_length) 1681 # if current_message is not None: 1682 # curr_client_info.current_message_length = None 1683 # curr_client_info.input_from_client.put(current_message[self.message_size_len:]) 1684 # result = True 1685 # else: 1686 # break 1687 # else: 1688 # break 1689 # return result 1690 1691 # @profile 1692 # def _read_messages_from_raw_input_into_fifo(self, curr_client_info: Connection): 1693 # result = False 1694 # if curr_client_info.this_is_raw_connection: 1695 # if curr_client_info.input_from_client.size(): 1696 # # print('curr_client_info.input_from_client.size(): ', curr_client_info.input_from_client.size()) 1697 # result = True 1698 # return result 1699 # 1700 # while True: 1701 # if curr_client_info.current_message_length is None: 1702 # current_message_length = curr_client_info.raw_input_from_client.read_data(self.message_size_len) 1703 # if current_message_length is not None: 1704 # curr_client_info.current_message_length = int.from_bytes(current_message_length, 'little') 1705 # if curr_client_info.current_message_length is not None: 1706 # current_message = curr_client_info.raw_input_from_client.get_data( 1707 # self.message_size_len + curr_client_info.current_message_length) 1708 # if current_message is not None: 1709 # curr_client_info.current_message_length = None 1710 # curr_client_info.input_from_client.put(current_message[self.message_size_len:]) 1711 # result = True 1712 # else: 1713 # break 1714 # else: 1715 # break 1716 # return result 1717 1718 # @profile 1719 def _read_messages_from_raw_input_into_fifo(self, connection_info: Connection) -> bool: 1720 result = False 1721 # if connection_info.this_is_raw_connection: 1722 # if connection_info.input_from_client.size(): 1723 # # print('connection_info.input_from_client.size(): ', connection_info.input_from_client.size()) 1724 # result = True 1725 # return result 1726 1727 try: 1728 while True: 1729 if connection_info.current_message_length is None: 1730 current_message_length = connection_info.raw_input_from_client.get_data(self.message_size_len) 1731 connection_info.current_message_length = int.from_bytes(current_message_length, 'little') 1732 1733 current_message = connection_info.raw_input_from_client.get_data( 1734 connection_info.current_message_length) 1735 if current_message is None: 1736 break 1737 else: 1738 connection_info.input_from_client.put(current_message) 1739 connection_info.current_message_length = None 1740 result = True 1741 except TypeError: 1742 # TODO: what it means? 1743 pass 1744 1745 return result 1746 1747 def _read_messages_from_input_memoryview_into_raw_input(self, connection_info: Connection) -> bool: 1748 result = False 1749 1750 while True: 1751 if connection_info.current_message_length is None: 1752 if connection_info.current_input_memoryview_diff >= self.message_size_len: 1753 current_message_length = memoryview(connection_info.current_input_memoryview.obj)[ 1754 connection_info.current_input_memoryview_offset:self.message_size_len] 1755 connection_info.current_message_length = int.from_bytes(current_message_length, 'little') 1756 connection_info.current_input_memoryview_offset += self.message_size_len 1757 connection_info.current_input_memoryview_diff -= self.message_size_len 1758 connection_info.current_input_memoryview_message_nbytes = 0 1759 # elif (connection_info.current_input_memoryview_diff + connection_info.raw_input_from_client.size()) \ 1760 # >= self.message_size_len: 1761 # current_message_length_part_0 = \ 1762 # memoryview(connection_info.current_input_memoryview.obj)[ 1763 # connection_info.current_input_memoryview_offset:self.message_size_len] 1764 # current_message_length_part_1_len = \ 1765 # self.message_size_len - connection_info.current_input_memoryview_diff 1766 # current_message_length_part_1 = connection_info.raw_input_from_client.get_data() 1767 # pass 1768 else: 1769 break 1770 1771 current_message = connection_info.raw_input_from_client.get_data( 1772 connection_info.current_message_length) 1773 if current_message is None: 1774 break 1775 else: 1776 connection_info.input_from_client.put(current_message) 1777 connection_info.current_message_length = None 1778 result = True 1779 1780 if (not connection_info.current_input_memoryview) and connection_info.current_input_memoryview_diff: 1781 connection_info.raw_input_from_client.add_piece_of_data( 1782 memoryview(connection_info.current_input_memoryview.obj)[ 1783 connection_info.current_input_memoryview_offset:]) 1784 1785 return result 1786 1787 def _send_message_through_connection(self, client_info: Connection, data): 1788 if client_info.conn.existence: 1789 # if len(data) <= self.largest_small_output_data_block_size: 1790 # client_info.output_to_client.put(len(data).to_bytes(self.message_size_len, 'little') + data) 1791 # else: 1792 # client_info.output_to_client.put(len(data).to_bytes(self.message_size_len, 'little')) 1793 # client_info.output_to_client.put(data) 1794 client_info.output_to_client.put(len(data).to_bytes(self.message_size_len, 'little')) 1795 client_info.output_to_client.put(data) 1796 1797 self._output_check_sockets.add(client_info.conn.result) 1798 else: 1799 if __debug__: 1800 self._log('ERROR: SEND MESSAGE TO CLIENT {}: "{}"'.format(client_info.addr.result, data)) 1801 raise Exception('EXCEPTION: SEND MESSAGE TO CLIENT: Client is disconnected! You can not send data to him!') 1802 1803 def _generate_list_of_messages_with_their_length(self, messages_list): 1804 for message in messages_list: 1805 yield len(message).to_bytes(self.message_size_len, 'little') 1806 yield message 1807 1808 def _send_messages_through_connection(self, client_info: Connection, messages_list): 1809 if client_info.conn.existence: 1810 # for message in messages_list: 1811 # client_info.output_to_client.put(len(message).to_bytes(self.message_size_len, 'little')) 1812 # client_info.output_to_client.put(message) 1813 client_info.output_to_client.extend(self._generate_list_of_messages_with_their_length(messages_list)) 1814 1815 self._output_check_sockets.add(client_info.conn.result) 1816 else: 1817 if __debug__: 1818 self._log( 1819 'ERROR: SEND MESSAGES TO CLIENT {}: "{}"'.format(client_info.addr.result, messages_list)) 1820 raise Exception('EXCEPTION: SEND MESSAGES TO CLIENT: Client is disconnected! You can not send data to him!') 1821 1822 def _send_message_through_connection_raw(self, client_info: Connection, data): 1823 if client_info.conn.existence: 1824 client_info.output_to_client.put(data) 1825 self._output_check_sockets.add(client_info.conn.result) 1826 else: 1827 if __debug__: 1828 self._log('ERROR: SEND MESSAGE TO CLIENT {}: "{}"'.format(client_info.addr.result, data)) 1829 raise Exception( 1830 'EXCEPTION: SEND MESSAGE TO CLIENT RAW: Client is disconnected! You can not send data to him!') 1831 1832 def _send_messages_through_connection_raw(self, client_info: Connection, messages_list): 1833 if client_info.conn.existence: 1834 client_info.output_to_client.extend(messages_list) 1835 self._output_check_sockets.add(client_info.conn.result) 1836 else: 1837 if __debug__: 1838 self._log( 1839 'ERROR: SEND MESSAGES TO CLIENT {}: "{}"'.format(client_info.addr.result, messages_list)) 1840 raise Exception( 1841 'EXCEPTION: SEND MESSAGES TO CLIENT RAW: Client is disconnected! You can not send data to him!') 1842 1843 def _move_message_from_fifo_to_memoryview(self, client_info: Connection): 1844 if client_info.current_output_memoryview is None: 1845 if client_info.output_to_client.size(): 1846 client_info.current_output_memoryview = memoryview(client_info.output_to_client.get()) 1847 1848 # @profile 1849 def _consolidate_and_move_messages_from_fifo_to_memoryview(self, client_info: Connection): 1850 output_fifo_size = client_info.output_to_client.size() 1851 if output_fifo_size > 1: 1852 result_data, result_size, result_qnt = \ 1853 client_info.output_to_client.get_at_least_size(524288) 1854 if result_qnt > 1: 1855 client_info.current_output_memoryview = memoryview(b''.join(result_data)) 1856 else: 1857 client_info.current_output_memoryview = memoryview(result_data.popleft()) 1858 elif output_fifo_size == 1: 1859 client_info.current_output_memoryview = memoryview(client_info.output_to_client.get()) 1860 1861 # # try: 1862 # if client_info.output_to_client.size(): 1863 # result_data, result_size, result_qnt = client_info.output_to_client.get_at_least_size(524288) 1864 # # result_data, result_size, result_qnt = client_info.output_to_client.get_at_least_size(1048576) 1865 # # result_data, result_size, result_qnt = client_info.output_to_client.get_at_least_size(5242880) 1866 # if result_qnt > 1: 1867 # client_info.current_output_memoryview = memoryview(b''.join(result_data)) 1868 # else: 1869 # client_info.current_output_memoryview = memoryview(result_data.popleft()) 1870 # # client_info.current_output_memoryview = \ 1871 # # memoryview(client_info.output_to_client.get_at_least_size(524288)[0].popleft()) 1872 # # except FIFOIsEmpty: 1873 # # pass 1874 1875 # # @profile 1876 # def _consolidate_and_move_messages_from_fifo_to_memoryview(self, client_info: Connection): 1877 # # if client_info.current_output_memoryview is None: 1878 # fifo_size = client_info.output_to_client.size() 1879 # # fifo_size = client_info.output_to_client._useful_size 1880 # if fifo_size: 1881 # # if fifo_size >= 3: 1882 # # client_info.current_output_memoryview = \ 1883 # # memoryview(b''.join(client_info.output_to_client._l[client_info.output_to_client._offset:])) 1884 # # client_info.output_to_client = copy.copy(client_info.output_to_client) 1885 # # else: 1886 # # client_info.current_output_memoryview = \ 1887 # # memoryview(client_info.output_to_client.get()) 1888 # if fifo_size >= 2: 1889 # fifo_list = client_info.output_to_client._l 1890 # offs = client_info.output_to_client._offset 1891 # fifo_size_under = get_num_pieces_with_full_size_lesser_than_with_offset( 1892 # fifo_list, 524288, offs) 1893 # if not fifo_size_under: 1894 # fifo_size_under = 1 1895 # if fifo_size_under > 1: 1896 # client_info.current_output_memoryview = memoryview( 1897 # b''.join(fifo_list[offs:offs + fifo_size_under])) 1898 # client_info.output_to_client._free(fifo_size_under) 1899 # else: 1900 # client_info.current_output_memoryview = \ 1901 # memoryview(client_info.output_to_client.get()) 1902 # else: 1903 # client_info.current_output_memoryview = \ 1904 # memoryview(client_info.output_to_client.get()) 1905 1906 # # @profile 1907 # def _consolidate_raw_messages_in_input_from_client_fifo(self, client_info: Connection): 1908 # fifo_size = client_info.input_from_client.size() 1909 # if fifo_size: 1910 # if fifo_size >= 2: 1911 # fifo_list = client_info.input_from_client._l 1912 # offs = client_info.input_from_client._offset 1913 # fifo_size_under = get_num_pieces_with_full_size_lesser_than_with_offset( 1914 # fifo_list, 3 * 1024, offs) 1915 # if not fifo_size_under: 1916 # fifo_size_under = 1 1917 # if fifo_size_under > 1: 1918 # result_message = b''.join(fifo_list[offs:offs + fifo_size_under]) 1919 # client_info.input_from_client._free(fifo_size_under) 1920 # client_info.input_from_client.put(result_message) 1921 1922 def _process_client_keyword(self, client_socket: socket.socket): 1923 connection_id = self._connection_by_conn[client_socket] 1924 connection_info = self._connections[connection_id] 1925 1926 if connection_info.input_from_client.size() >= 0: 1927 expected_client_id = None 1928 expected_client_info = None 1929 1930 this_is_super_server_client = False 1931 # if connection_info.conn.result in self._conns_of_expected_clients: 1932 # expected_client_id = self._conns_of_expected_clients[connection_info.conn.result] 1933 # expected_client_info = self._expected_clients[expected_client_id] 1934 # if ConnectionType.active_connected == expected_client_info.connection_settings.connection_type: 1935 # this_is_super_server_client = True 1936 1937 # if connection_info.connected_expected_client_id is not None: 1938 # expected_client_id = connection_info.connected_expected_client_id 1939 # expected_client_info = self._expected_clients[expected_client_id] 1940 # if ConnectionType.active_connected == expected_client_info.connection_settings.connection_type: 1941 # this_is_super_server_client = True 1942 if connection_info.connected_expected_client is not None: 1943 expected_client_info = connection_info.connected_expected_client 1944 expected_client_id = expected_client_info.id 1945 if ConnectionType.active_connected == expected_client_info.connection_settings.connection_type: 1946 this_is_super_server_client = True 1947 1948 if this_is_super_server_client: 1949 # This is connection to Super-Server. So we expect an answer like b'OK' 1950 super_server_answer__keyword_accepted = connection_info.input_from_client.get() 1951 super_server_answer__keyword_accepted = bytes(super_server_answer__keyword_accepted) 1952 if super_server_answer__keyword_accepted == self.server_answer__keyword_accepted: 1953 # Answer was acceptable 1954 self._unconfirmed_clients.remove(client_socket) 1955 self._io_iteration_result.newly_connected_expected_clients.add(expected_client_id) 1956 if expected_client_info.will_use_raw_client_connection: 1957 connection_info.this_is_raw_connection = True 1958 else: 1959 # Answer was NOT acceptable 1960 self._mark_connection_to_be_closed_immediately(connection_info) 1961 self._mark_connection_as_ready_for_deletion(connection_info) 1962 if __debug__: 1963 self._log('ERROR: SUPER SERVER ANSWER - KEYWORD WAS NOT ACCEPTED: {}'.format( 1964 super_server_answer__keyword_accepted)) 1965 else: 1966 # This is connection to client. So we expect a keyword 1967 keyword = connection_info.input_from_client.get() 1968 keyword = bytes(keyword) 1969 connection_info.keyword = keyword 1970 self._unconfirmed_clients.remove(client_socket) 1971 self._send_message_through_connection(connection_info, self.server_answer__keyword_accepted) 1972 1973 if keyword in self._keywords_for_expected_clients: 1974 # empty expected client was already registered 1975 expected_client_id = self._keywords_for_expected_clients[keyword] 1976 expected_client_info = self._expected_clients[expected_client_id] 1977 expected_client_info.connection_id = connection_id 1978 expected_client_info._Client__connection = connection_info 1979 self._conns_of_expected_clients[client_socket] = expected_client_id 1980 connection_info.connected_expected_client_id = expected_client_id 1981 connection_info.connected_expected_client = expected_client_info 1982 self._io_iteration_result.newly_connected_expected_clients.add(expected_client_id) 1983 if expected_client_info.will_use_raw_client_connection: 1984 connection_info.this_is_raw_connection = True 1985 else: 1986 # it is unknown expected client 1987 if self.unexpected_clients_are_allowed: 1988 self._add_unexpected_client(connection_id, keyword) 1989 else: 1990 self._mark_connection_to_be_closed_immediately(connection_info) 1991 self._mark_connection_as_ready_for_deletion(connection_info) 1992 1993 def _check_is_client_have_data_to_read_in_fifo(self, readable_socket: socket.socket): 1994 # connection_id = self._connection_by_conn[readable_socket] 1995 client_info = self._connections[self._connection_by_conn[readable_socket]] 1996 # if readable_socket in self._conns_of_expected_clients: 1997 # expected_client_id = self._conns_of_expected_clients[readable_socket] 1998 # if client_info.input_from_client.size(): 1999 # self._io_iteration_result.clients_have_data_to_read.add(expected_client_id) 2000 if client_info.connected_expected_client_id is not None: 2001 if client_info.input_from_client.size(): 2002 self._io_iteration_result.clients_have_data_to_read.add( 2003 client_info.connected_expected_client_id) 2004 if client_info.has_inline_processor: 2005 self._inline_processor__on__data_received(client_info) 2006 2007 def _client_have_data_to_read_in_fifo(self, readable_socket: socket.socket): 2008 if readable_socket in self._conns_of_expected_clients: 2009 expected_client_id = self._conns_of_expected_clients[readable_socket] 2010 expected_client = self._expected_clients[expected_client_id] 2011 self._io_iteration_result.clients_have_data_to_read.add(expected_client_id) 2012 connection_info = expected_client._Client__connection 2013 # connection_info = self._connections.get(expected_client.connection_id) 2014 if connection_info.has_inline_processor: 2015 self._inline_processor__on__data_received(connection_info) 2016 2017 def _inline_processor__apply_parameters(self, connection_info: Connection, 2018 expected_client: Client): 2019 inline_processor = expected_client.obj_for_inline_processing 2020 2021 inline_processor.is_in_raw_mode = inline_processor._InlineProcessor__set__is_in_raw_mode 2022 connection_info.this_is_raw_connection = inline_processor._InlineProcessor__set__is_in_raw_mode 2023 2024 if inline_processor._InlineProcessor__set__mark_socket_as_should_be_closed_immediately: 2025 inline_processor._InlineProcessor__set__mark_socket_as_should_be_closed_immediately = False 2026 self._mark_connection_to_be_closed_immediately(connection_info) 2027 2028 if inline_processor._InlineProcessor__set__mark_socket_as_ready_to_be_closed: 2029 inline_processor._InlineProcessor__set__mark_socket_as_ready_to_be_closed = False 2030 self._mark_connection_as_ready_to_be_closed(connection_info) 2031 2032 def _inline_processor__init_parameters(self, connection_info: Connection, 2033 expected_client: Client): 2034 inline_processor = expected_client.obj_for_inline_processing 2035 2036 inline_processor.is_in_raw_mode = connection_info.this_is_raw_connection 2037 inline_processor._InlineProcessor__set__is_in_raw_mode = connection_info.this_is_raw_connection 2038 2039 def _inline_processor__on__data_received(self, connection_info: Connection): 2040 expected_client = connection_info.connected_expected_client 2041 inline_processor = expected_client.obj_for_inline_processing 2042 2043 # if not callable(getattr(inline_processor, 'on__data_received', None)): 2044 # return False 2045 2046 try: 2047 while connection_info.input_from_client.size(): 2048 inline_processor.on__data_received(connection_info.input_from_client.get()) 2049 2050 if inline_processor.output_messages: 2051 while inline_processor.output_messages: 2052 another_message = inline_processor.output_messages.popleft() 2053 if not connection_info.this_is_raw_connection: 2054 connection_info.output_to_client.put( 2055 len(another_message).to_bytes(self.message_size_len, 'little')) 2056 connection_info.output_to_client.put(another_message) 2057 self._output_check_sockets.add(connection_info.conn.result) 2058 if connection_info.output_to_client.get_data_full_size() >= 65536: 2059 # self._write_data_to_socket(connection_info.conn.result) 2060 self._write_data_to_socket(connection_info) 2061 return True 2062 except: 2063 self.remove_client(expected_client.id) 2064 exc = sys.exc_info() 2065 exception = exc 2066 error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0])) 2067 formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2]) 2068 exception = exception[:2] + (formatted_traceback,) 2069 trace_str = ''.join(exception[2]) 2070 result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str) 2071 if __debug__: 2072 self._log('EXCEPTION: INLINE PROCESSOR: ON DATA RECEIVED: {}'.format(result_string)) 2073 2074 return False 2075 2076 def _inline_processor__on__output_buffers_are_empty(self, connection_info: Connection, 2077 expected_client: Client): 2078 inline_processor = expected_client.obj_for_inline_processing 2079 2080 if not connection_info.has_inline_processor: 2081 return False 2082 2083 # if not callable(getattr(inline_processor, 'on__output_buffers_are_empty', None)): 2084 # return False 2085 2086 try: 2087 inline_processor.on__output_buffers_are_empty() 2088 2089 if inline_processor.output_messages: 2090 while inline_processor.output_messages: 2091 another_message = inline_processor.output_messages.popleft() 2092 if not connection_info.this_is_raw_connection: 2093 connection_info.output_to_client.put( 2094 len(another_message).to_bytes(self.message_size_len, 'little')) 2095 connection_info.output_to_client.put(another_message) 2096 self._output_check_sockets.add(connection_info.conn.result) 2097 if connection_info.output_to_client.get_data_full_size() >= 65536: 2098 # self._write_data_to_socket(connection_info.conn.result) 2099 self._write_data_to_socket(connection_info) 2100 return True 2101 2102 # residual_data = inline_processor.output_messages.get_data_full_size() 2103 # if residual_data: 2104 # output_messages, result_size, result_qnt = \ 2105 # inline_processor.output_messages.get_at_least_size(residual_data) 2106 # if connection_info.this_is_raw_connection: 2107 # self._send_messages_through_connection_raw(connection_info, output_messages) 2108 # else: 2109 # self._send_messages_through_connection(connection_info, output_messages) 2110 # self._write_data_to_socket(connection_info.conn.result) 2111 # # self._io_iteration_result.clients_with_empty_output_fifo.remove( 2112 # # connection_info.connected_expected_client_id) 2113 # result = True 2114 2115 # self._inline_processor__apply_parameters(connection_info, expected_client) 2116 # self._clients_with_inline_processors_that_need_to_apply_parameters.add(expected_client.id) 2117 except: 2118 self.remove_client(expected_client.id) 2119 exc = sys.exc_info() 2120 exception = exc 2121 error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0])) 2122 formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2]) 2123 exception = exception[:2] + (formatted_traceback,) 2124 trace_str = ''.join(exception[2]) 2125 result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str) 2126 if __debug__: 2127 self._log( 2128 'EXCEPTION: INLINE PROCESSOR: ON OUTPUT BUFFERS ARE EMPTY: {}'.format(result_string)) 2129 2130 return False 2131 2132 def _inline_processor__on__connection_lost(self, connection_info: Connection, 2133 expected_client: Client): 2134 inline_processor = expected_client.obj_for_inline_processing 2135 2136 if not connection_info.has_inline_processor: 2137 return False 2138 2139 # if not callable(getattr(inline_processor, 'on__connection_lost', None)): 2140 # return False 2141 2142 try: 2143 inline_processor.on__connection_lost() 2144 except: 2145 exc = sys.exc_info() 2146 exception = exc 2147 error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0])) 2148 formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2]) 2149 exception = exception[:2] + (formatted_traceback,) 2150 trace_str = ''.join(exception[2]) 2151 result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str) 2152 if __debug__: 2153 self._log('EXCEPTION: INLINE PROCESSOR: ON CONNECTION LOST: {}'.format(result_string)) 2154 self.remove_client(expected_client.id) 2155 2156 def _inline_processor__on__connection_lost_by_connection_id(self, connection_id): 2157 connection_info = self._connections[connection_id] 2158 expected_client_info = connection_info.connected_expected_client 2159 if (expected_client_info is not None) and connection_info.has_inline_processor: 2160 self._inline_processor__on__connection_lost(connection_info, expected_client_info) 2161 self._io_iteration_result.clients_with_disconnected_connection.remove( 2162 expected_client_info.id) 2163 2164 def _unlink_good_af_unix_sockets(self): 2165 if 'posix' == os.name: 2166 for gate_connection_settings in self.gates_connections_settings: 2167 if gate_connection_settings.socket_family == socket.AF_UNIX: 2168 try: 2169 os.unlink(gate_connection_settings.socket_address) 2170 except: 2171 if __debug__: 2172 self._log('EXCEPTION: SERVER END: TRYING TO UNLINK GOOD AF_UNIX GATE: {}'.format( 2173 gate_connection_settings.socket_address)) 2174 raise 2175 2176 def _check_for_initial_af_unix_socket_unlink(self, connection_settings: ConnectionSettings): 2177 if 'posix' == os.name: 2178 if connection_settings.socket_family == socket.AF_UNIX: 2179 if os.path.exists(connection_settings.socket_address): 2180 # try: 2181 # os.unlink(connection_settings.socket_address) 2182 # except OSError: 2183 # if __debug__: self._log('EXCEPTION: INITIATION: GATE: CAN\'T UNLINK EXISTING AF_UNIX SOCKET: {}'.format( 2184 # connection_settings.socket_address)) 2185 # raise 2186 if __debug__: 2187 self._log('EXCEPTION: INITIATION: GATE: AF_UNIX SOCKET IS ALREADY EXIST: {}'.format( 2188 connection_settings.socket_address))
Callbacks from this class will be called from inside (and by) IOMethodBase loop.
80 def __init__(self, gates_connections_settings: Set[ConnectionSettings]): 81 """ 82 Port should not be open to a external world! 83 :param gates_connections_settings: set() of ConnectionSettings() 84 :return: 85 """ 86 super(ASockIOCore, self).__init__() 87 88 self.gates_connections_settings = gates_connections_settings 89 if not self.gates_connections_settings: 90 self.gates_connections_settings = set() 91 # raise Exception('gates_connections_settings should be provided!') 92 for gates_connections_settings in self.gates_connections_settings: 93 gates_connections_settings.check() 94 self.faulty_connection_settings = set() 95 self._connection_settings_by_gate_conn = dict() 96 97 self.set_of_gate_addresses = set() 98 self._gate = set() 99 self.reuse_gate_addr = False 100 self.reuse_gate_port = False 101 102 self.message_size_len = MESSAGE_SIZE_LEN 103 self.server_answer__keyword_accepted = SERVER_ANSWER__KEYWORD_ACCEPTED 104 # self.largest_small_output_data_block_size = 1024 * 4 - self.message_size_len 105 106 self._connections = dict() # key: ID; data: Connection() 107 self._connection_by_conn = dict() # key: conn; data: ID 108 self._connections_id_gen = IDGenerator() 109 110 self._connections_marked_as_ready_to_be_closed = set() 111 self._connections_marked_to_be_closed_immediately = set() 112 self._connections_marked_as_ready_to_be_deleted = set() 113 114 self._unconfirmed_clients = set() 115 116 self._we_have_connections_for_select = False 117 self._input_check_sockets = set() 118 self._output_check_sockets = set() 119 self._exception_check_sockets = set() 120 121 # ID (GUID) и другая информация клиентов, подключение которых явно ожидается 122 self._expected_clients = dict() # key: ID; data: Client() 123 self._expected_clients_id_gen = IDGenerator() 124 self._keywords_for_expected_clients = dict() # key: keyword; data: info 125 self._conns_of_expected_clients = dict() # key: conn; data expected_client_ID 126 127 self.unexpected_clients_are_allowed = True 128 129 # Список неопознанных и неожидаемых клиентов. Если клиент выдал свой GUID и позже кто-то добавил этот GUID в 130 # список ожидаемых клиентов - данный клиент будет автоматически подхвачен. 131 self._unexpected_clients = dict() # key: ID; data: Client() 132 self._unexpected_clients_id_gen = IDGenerator() 133 self._keywords_of_unexpected_clients = dict() 134 self._conns_of_unexpected_clients = dict() 135 136 self._io_iteration_result = IoIterationResult() 137 138 self.raw_checker_for_new_incoming_connections = CheckIsRawConnection() 139 self.need_to_auto_check_incoming_raw_connection = False 140 self.unknown_clients_are_allowed = False 141 self._unknown_clients_keyword_gen = IDGenerator(GeneratorType.guid_string) 142 self.prefix_for_unknown_client_keywords = b'UNKNOWN CLIENT: ' 143 144 self.echo_log = False 145 self._internal_log = deque() 146 147 self.recv_sizes = deque() 148 self.recv_buff_sizes = deque() 149 150 self.should_get_client_addr_info_on_connection = True 151 152 self.use_nodelay_inet = False 153 self.use_speed_optimized_socket_read = False 154 155 self.show_inform_about_accept_stop_because_of_all_buffers_size_limit = \ 156 FrontTriggerableVariable(FrontTriggerableVariableType.equal, True) 157 self.show_inform_about_read_stop_because_of_in_buffer_size_limit = \ 158 FrontTriggerableVariable(FrontTriggerableVariableType.equal, True) 159 self.show_inform_about_work_stop_because_of_out_buffer_size_limit = \ 160 FrontTriggerableVariable(FrontTriggerableVariableType.equal, True) 161 162 self.class_for_unknown_clients_inline_processing = None 163 164 self._clients_with_inline_processors_that_need_to_apply_parameters = set()
Port should not be open to a external world! :param gates_connections_settings: set() of ConnectionSettings() :return:
167 def io_iteration(self, timeout=0.0): 168 """ 169 170 :param timeout: timeout in seconds 171 :return: 172 """ 173 result = self._io_iteration_result 174 175 if self._we_have_connections_for_select: 176 need_to_repeat = True 177 178 while need_to_repeat: 179 readable, writable, exceptional = select.select(self._input_check_sockets, 180 self._output_check_sockets, 181 self._exception_check_sockets, 182 timeout) 183 read_is_forbidden = True 184 if (self.global_in__data_full_size.result - self.global_in__deletable_data_full_size.result) \ 185 <= self.global_in__data_size_limit.result: 186 read_is_forbidden = False 187 188 # Handle inputs 189 for s in readable: 190 read_result = self._read_data_from_socket(s) 191 if read_result: 192 # read_result = self._read_messages_from_raw_input_into_fifo(s) 193 # if read_result: 194 if s in self._unconfirmed_clients: 195 self._process_client_keyword(s) 196 self._check_is_client_have_data_to_read_in_fifo(s) 197 else: 198 self._client_have_data_to_read_in_fifo(s) 199 200 if __debug__: 201 read_is_forbidden_test = \ 202 self.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger( 203 read_is_forbidden) 204 if read_is_forbidden_test is not None: 205 if read_is_forbidden_test: 206 print('Read is suppressed until data will be processed.') 207 else: 208 print('Read is allowed: data is processed.') 209 210 # Handle outputs 211 for s in writable: 212 curr_client_info = self._connections[self._connection_by_conn[s]] 213 self._write_data_to_socket(curr_client_info) 214 # self._write_data_to_socket(s) 215 216 # Handle "exceptional conditions" 217 for s in exceptional: 218 self._handle_connection_error(s) 219 220 # Set parameters for inline processors 221 if self._clients_with_inline_processors_that_need_to_apply_parameters: 222 for ec_id in self._clients_with_inline_processors_that_need_to_apply_parameters: 223 expected_client_info = self._expected_clients[ec_id] 224 connection_info = expected_client_info._Client__connection 225 # connection_info = self._connections.get(expected_client_info.connection_id) 226 self._inline_processor__apply_parameters(connection_info, expected_client_info) 227 self._clients_with_inline_processors_that_need_to_apply_parameters.clear() 228 229 # Close sockets 230 if self._connections_marked_to_be_closed_immediately: 231 sockets_should_be_closed_immediately = self._connections_marked_to_be_closed_immediately 232 self._connections_marked_to_be_closed_immediately = set() 233 for closeable_socket in sockets_should_be_closed_immediately: 234 connection_id = self._connection_by_conn[closeable_socket] 235 # self.close_connection_by_conn(closeable_socket) 236 self.close_connection(connection_id) 237 self._inline_processor__on__connection_lost_by_connection_id(connection_id) 238 239 # Removing clients 240 if self._connections_marked_as_ready_to_be_deleted: 241 clients_ready_to_be_deleted = self._connections_marked_as_ready_to_be_deleted 242 self._connections_marked_as_ready_to_be_deleted = set() 243 for faulty_socket in clients_ready_to_be_deleted: 244 self.remove_connection_by_conn(faulty_socket) 245 246 if (self.global_out__data_full_size.result - self.global_out__deletable_data_full_size.result) \ 247 <= self.global_out__data_size_limit.result: 248 need_to_repeat = False 249 else: 250 need_to_repeat = True 251 252 need_to_repeat = False 253 254 if __debug__: 255 need_to_repeat_show = self.show_inform_about_work_stop_because_of_out_buffer_size_limit.test_trigger( 256 need_to_repeat) 257 if need_to_repeat_show is not None: 258 if need_to_repeat_show: 259 print('Work is suppressed until data will be out.') 260 else: 261 print('Work is allowed: data is out.') 262 263 self._io_iteration_result = IoIterationResult() 264 return result
:param timeout: timeout in seconds :return:
266 def listen(self, backlog=1): 267 # backlog = backlog or 1 268 269 new_connection_settings = set() 270 for gate_connection_settings in self.gates_connections_settings: 271 gate = self.add_gate(gate_connection_settings) 272 if gate: 273 new_connection_settings.add(gate) 274 # gate = None 275 # try: 276 # try: 277 # gate = socket.socket(gate_connection_settings.socket_family, 278 # gate_connection_settings.socket_type, 279 # gate_connection_settings.socket_protocol, 280 # gate_connection_settings.socket_fileno) 281 # gate.setblocking(0) 282 # # gate.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 283 # except (socket.error, OSError) as err: 284 # gate = None 285 # if __debug__: self._log('EXCEPTION: GATE: LISTEN: CREATE SOCKET: {}, {}'.format( 286 # err.errno, err.strerror)) 287 # raise InternalNotCriticalError() 288 # 289 # if self.reuse_gate_port: 290 # gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 291 # 292 # try: 293 # self._check_for_initial_af_unix_socket_unlink(gate_connection_settings) 294 # gate.bind(gate_connection_settings.socket_address) 295 # except (socket.error, OSError) as err: 296 # gate.close() 297 # gate = None 298 # if __debug__: self._log('EXCEPTION: GATE: BIND:"{}", {}, {}'.format( 299 # gate_connection_settings.socket_address, err.errno, err.strerror)) 300 # raise InternalNotCriticalError() 301 # try: 302 # gate.listen(backlog) 303 # except (socket.error, OSError) as err: 304 # gate.close() 305 # gate = None 306 # if __debug__: self._log('EXCEPTION: GATE: LISTEN:"{}", {}, {}'.format( 307 # gate_connection_settings.socket_address, err.errno, err.strerror)) 308 # raise InternalNotCriticalError() 309 # 310 # if self.reuse_gate_addr: 311 # gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 312 # 313 # self._input_check_sockets.add(gate) 314 # self._exception_check_sockets.add(gate) 315 # except InternalNotCriticalError as err: 316 # gate = None 317 # finally: 318 # if gate: 319 # self._gate.add(gate) 320 # if gate_connection_settings.socket_family in INET_TYPE_CONNECTIONS: 321 # self.set_of_gate_addresses.add(gate_connection_settings.socket_address[0]) 322 # elif socket.AF_UNIX == gate_connection_settings.socket_family: 323 # self.set_of_gate_addresses.add(gate_connection_settings.socket_address) 324 # else: 325 # self.set_of_gate_addresses.add(gate_connection_settings.socket_address) 326 # self._log('WARNING: GATE: SAVE CONNECTION ADDRESS: UNKNOWN SOCKET FAMILY') 327 # self._connection_settings_by_gate_conn[gate] = gate_connection_settings 328 # self._we_have_connections_for_select = True 329 # new_connection_settings.add(gate_connection_settings) 330 # else: 331 # self.faulty_connection_settings.add(gate_connection_settings) 332 self.gates_connections_settings = new_connection_settings 333 334 return len(self.gates_connections_settings)
336 def add_gate(self, gate_connection_settings: ConnectionSettings): 337 gate_connection_settings.check() 338 339 gate = None 340 try: 341 try: 342 gate = socket.socket(gate_connection_settings.socket_family, gate_connection_settings.socket_type, 343 gate_connection_settings.socket_protocol, gate_connection_settings.socket_fileno) 344 gate.setblocking(0) 345 # gate.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 346 except (socket.error, OSError) as err: 347 gate = None 348 if __debug__: 349 self._log('EXCEPTION: GATE: LISTEN: CREATE SOCKET: {}, {}'.format( 350 err.errno, err.strerror)) 351 raise InternalNotCriticalError() 352 353 if self.reuse_gate_port: 354 gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 355 356 try: 357 self._check_for_initial_af_unix_socket_unlink(gate_connection_settings) 358 gate.bind(gate_connection_settings.socket_address) 359 except (socket.error, OSError) as err: 360 gate.close() 361 gate = None 362 if __debug__: 363 self._log('EXCEPTION: GATE: BIND:"{}", {}, {}'.format( 364 gate_connection_settings.socket_address, err.errno, err.strerror)) 365 raise InternalNotCriticalError() 366 try: 367 gate.listen(gate_connection_settings.backlog) 368 except (socket.error, OSError) as err: 369 gate.close() 370 gate = None 371 if __debug__: 372 self._log('EXCEPTION: GATE: LISTEN:"{}", {}, {}'.format( 373 gate_connection_settings.socket_address, err.errno, err.strerror)) 374 raise InternalNotCriticalError() 375 376 if self.reuse_gate_addr: 377 gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 378 379 self._input_check_sockets.add(gate) 380 self._exception_check_sockets.add(gate) 381 except InternalNotCriticalError as err: 382 gate = None 383 finally: 384 if gate: 385 self._gate.add(gate) 386 if gate_connection_settings.socket_family in INET_TYPE_CONNECTIONS: 387 self.set_of_gate_addresses.add(gate_connection_settings.socket_address[0]) 388 elif socket.AF_UNIX == gate_connection_settings.socket_family: 389 self.set_of_gate_addresses.add(gate_connection_settings.socket_address) 390 else: 391 self.set_of_gate_addresses.add(gate_connection_settings.socket_address) 392 self._log('WARNING: GATE: SAVE CONNECTION ADDRESS: UNKNOWN SOCKET FAMILY') 393 self._connection_settings_by_gate_conn[gate] = gate_connection_settings 394 self._we_have_connections_for_select = True 395 return gate_connection_settings 396 else: 397 self.faulty_connection_settings.add(gate_connection_settings) 398 return None
412 def close(self): 413 for gate in self._gate: 414 gate.close() 415 416 if gate in self._input_check_sockets: 417 self._input_check_sockets.remove(gate) 418 if gate in self._exception_check_sockets: 419 self._exception_check_sockets.remove(gate) 420 421 if not self._input_check_sockets: 422 self._we_have_connections_for_select = False 423 self._unlink_good_af_unix_sockets()
430 def add_client(self, expected_client_info: Client): 431 """ 432 Добавляет новый expected client в список. Это может быть как клиент (который сам подключился или подключится в 433 будущем), так и супер-сервер, попытка подключения к которому будет осуществлена тут же - на месте. 434 При этом если произойдет какая-либо ошибка при подключении к супер-серверу - expected client не будет 435 зарегистрирован. Однако client может быть создан. В случае ошибки он будет помечен для закрытия и удаления. 436 Поэтому исключения нужно перехватывать, и после этого проводить как минимум один (как минимум завершающий - 437 перед закрытием и уничтожением сервера) цикл обработки io_iteration(). 438 439 :param expected_client_info: link to Client() 440 :return: expected_client_id 441 """ 442 if (expected_client_info.connection_settings.keyword is None) \ 443 and (ConnectionType.active_accepted == expected_client_info.connection_settings.connection_type): 444 raise Exception('Keyword in Client.connection_settings should not be None for a Client connection!') 445 446 if expected_client_info.connection_settings.keyword in self._keywords_for_expected_clients: 447 raise Exception('Expected Client with keyword "{}" is already registered!'.format( 448 expected_client_info.connection_settings.keyword)) 449 450 expected_client_info.id = self._expected_clients_id_gen() 451 452 if self.unexpected_clients_are_allowed: 453 if expected_client_info.connection_settings.keyword in self._keywords_of_unexpected_clients: 454 # клиент уже подключен 455 unexpected_client_id = self._keywords_of_unexpected_clients[ 456 expected_client_info.connection_settings.keyword] 457 unexpected_client_info = self._unexpected_clients[unexpected_client_id] 458 connection_info = expected_client_info._Client__connection 459 # connection_info = self._connections.get(expected_client_info.connection_id) 460 if (unexpected_client_info.connection_settings.connection_type == 461 expected_client_info.connection_settings.connection_type) and \ 462 (ConnectionType.active_accepted == unexpected_client_info.connection_settings.connection_type): 463 # Произошел запрос на подключение к клиенту, и клиент с таким же ключевым словом уже 464 # подключен (с сервером этого быть не должно, и может произойти только при неверном ручном изменении 465 # внутренних данных объекта класса ASockIOCore). Необходимо переиспользовать уже имеющееся 466 # подключение. 467 # В случае если все же тут оказался соккет подключенный к супер-серверу - он будет автоматически 468 # отключен и соединение будет установлено в новом сокете. 469 expected_client_info.connection_id = unexpected_client_info.connection_id 470 expected_client_info._Client__connection = \ 471 unexpected_client_info._Client__connection 472 self._conns_of_expected_clients[connection_info.conn.result] = expected_client_info.id 473 connection_info.connected_expected_client_id = expected_client_info.id 474 connection_info.connected_expected_client = expected_client_info 475 self._io_iteration_result.newly_connected_expected_clients.add(expected_client_info.id) 476 else: 477 # Произошел запрос на подключение к супер-серверу, но клиент с таким же ключевым словом уже 478 # подключен (или наоборот). Или же просто имеется установленное соединение с супер-сервером. 479 # Необходимо его отключить. 480 self._mark_connection_to_be_closed_immediately(connection_info) 481 self._mark_connection_as_ready_for_deletion(connection_info) 482 self._remove_unexpected_client(unexpected_client_id) 483 else: 484 # клиент еще не подключен 485 expected_client_info.connection_id = None 486 expected_client_info._Client__connection = None 487 488 if ConnectionType.active_connected == expected_client_info.connection_settings.connection_type: 489 self._connect_to_super_server(expected_client_info) 490 491 self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword] = expected_client_info.id 492 self._expected_clients[expected_client_info.id] = expected_client_info 493 494 return expected_client_info.id
Добавляет новый expected client в список. Это может быть как клиент (который сам подключился или подключится в будущем), так и супер-сервер, попытка подключения к которому будет осуществлена тут же - на месте. При этом если произойдет какая-либо ошибка при подключении к супер-серверу - expected client не будет зарегистрирован. Однако client может быть создан. В случае ошибки он будет помечен для закрытия и удаления. Поэтому исключения нужно перехватывать, и после этого проводить как минимум один (как минимум завершающий - перед закрытием и уничтожением сервера) цикл обработки io_iteration().
:param expected_client_info: link to Client() :return: expected_client_id
496 def get_client_id_by_keyword(self, expected_client_keyword): 497 """ 498 :param expected_client_keyword: expected_client_keyword 499 :return: link to Client() 500 """ 501 return self._keywords_for_expected_clients[expected_client_keyword]
:param expected_client_keyword: expected_client_keyword :return: link to Client()
503 def get_client_info(self, expected_client_id): 504 """ 505 :param expected_client_id: expected_client_id 506 :return: link to Client() 507 """ 508 return self._expected_clients[expected_client_id]
:param expected_client_id: expected_client_id :return: link to Client()
510 def get_connection_input_fifo_size_for_client(self, expected_client_id): 511 expected_client_info = self._expected_clients[expected_client_id] 512 connection_info = expected_client_info._Client__connection 513 # connection_info = self._connections.get(expected_client_info.connection_id) 514 if connection_info is None: 515 raise Exception('Expected client was not connected yet!') 516 # if client_info.this_is_raw_connection: 517 # if client_info.input_from_client.size(): 518 # return 1 519 # else: 520 # return 0 521 # else: 522 # return client_info.input_from_client.size() 523 return connection_info.input_from_client.size()
525 def get_message_from_client(self, expected_client_id): 526 expected_client_info = self._expected_clients[expected_client_id] 527 connection_info = expected_client_info._Client__connection 528 # connection_info = self._connections.get(expected_client_info.connection_id) 529 if connection_info is None: 530 raise Exception('Expected client was not connected yet!') 531 if not connection_info.input_from_client.size(): 532 raise Exception('There is no readable data in expected client\'s FIFO!') 533 # if client_info.this_is_raw_connection: 534 # self._consolidate_raw_messages_in_input_from_client_fifo(client_info) 535 return connection_info.input_from_client.get()
538 def get_messages_from_client(self, expected_client_id): 539 expected_client_info = self._expected_clients[expected_client_id] 540 connection_info = expected_client_info._Client__connection 541 # connection_info = self._connections.get(expected_client_info.connection_id) 542 if connection_info is None: 543 raise Exception('Expected client was not connected yet!') 544 try: 545 while True: 546 yield connection_info.input_from_client.get() 547 except FIFOIsEmpty: 548 pass 549 # while client_info.input_from_client.size(): 550 # yield client_info.input_from_client.get()
552 def get_connection_output_fifo_size_for_client(self, expected_client_id): 553 expected_client_info = self._expected_clients[expected_client_id] 554 connection_info = expected_client_info._Client__connection 555 # connection_info = self._connections.get(expected_client_info.connection_id) 556 if connection_info is None: 557 raise Exception('Expected client was not connected yet!') 558 return connection_info.output_to_client.size()
560 def send_message_to_client(self, expected_client_id, data): 561 # data = bytes(data) 562 expected_client_info = self._expected_clients[expected_client_id] 563 connection_info = expected_client_info._Client__connection 564 # connection_info = self._connections.get(expected_client_info.connection_id) 565 if connection_info is None: 566 raise Exception('Expected client was not connected yet!') 567 if connection_info.this_is_raw_connection: 568 self._send_message_through_connection_raw(connection_info, data) 569 else: 570 self._send_message_through_connection(connection_info, data)
572 def send_messages_to_client(self, expected_client_id, messages_list): 573 # data = bytes(data) 574 expected_client_info = self._expected_clients[expected_client_id] 575 connection_info = expected_client_info._Client__connection 576 # connection_info = self._connections.get(expected_client_info.connection_id) 577 if connection_info is None: 578 raise Exception('Expected client was not connected yet!') 579 if connection_info.this_is_raw_connection: 580 self._send_messages_through_connection_raw(connection_info, messages_list) 581 else: 582 self._send_messages_through_connection(connection_info, messages_list)
584 def check_is_client_is_in_raw_connection_mode(self, expected_client_id): 585 expected_client_info = self._expected_clients[expected_client_id] 586 connection_info = expected_client_info._Client__connection 587 # connection_info = self._connections.get(expected_client_info.connection_id) 588 if connection_info is None: 589 raise Exception('Expected client was not connected yet!') 590 return connection_info.this_is_raw_connection
592 def switch_client_raw_connection_mode(self, expected_client_id, is_raw: bool): 593 expected_client_info = self._expected_clients[expected_client_id] 594 connection_info = expected_client_info._Client__connection 595 # connection_info = self._connections.get(expected_client_info.connection_id) 596 if connection_info is None: 597 raise Exception('Expected client was not connected yet!') 598 connection_info.this_is_raw_connection = is_raw
600 def set_inline_processor_for_client(self, expected_client_id, 601 class_for_unknown_clients_inline_processing: type): 602 expected_client_info = self._expected_clients[expected_client_id] 603 connection_info = expected_client_info._Client__connection 604 # connection_info = self._connections.get(expected_client_info.connection_id) 605 if connection_info is None: 606 raise Exception('Expected client was not connected yet!') 607 self._set_inline_processor_for_client(connection_info, expected_client_info, 608 class_for_unknown_clients_inline_processing)
610 def close_client_connection(self, expected_client_id, raise_if_already_closed=True): 611 """ 612 Connection will be closed immediately (inside this method) 613 :param expected_client_id: 614 :param raise_if_already_closed: 615 :return: 616 """ 617 if __debug__: 618 self._log('CLOSE EXPECTED CLIENT SOCKET:') 619 expected_client_info = self._expected_clients[expected_client_id] 620 connection_info = expected_client_info._Client__connection 621 # connection_info = self._connections.get(expected_client_info.connection_id) 622 if raise_if_already_closed and (connection_info is None): 623 raise Exception('Expected client was not connected yet!') 624 self.close_connection(connection_info.id)
Connection will be closed immediately (inside this method) :param expected_client_id: :param raise_if_already_closed: :return:
626 def mark_client_connection_as_should_be_closed_immediately(self, expected_client_id, 627 raise_if_already_closed=True): 628 """ 629 Connection will be closed immediately (inside main IO loop) 630 :param expected_client_id: 631 :param raise_if_already_closed: 632 :return: 633 """ 634 if __debug__: 635 self._log('MARK EXPECTED CLIENT SOCKET AS SHOULD BE CLOSED IMMEDIATELY:') 636 expected_client_info = self._expected_clients[expected_client_id] 637 connection_info = expected_client_info._Client__connection 638 # connection_info = self._connections.get(expected_client_info.connection_id) 639 if raise_if_already_closed and (connection_info is None): 640 raise Exception('Expected client was not connected yet!') 641 self._mark_connection_to_be_closed_immediately(connection_info)
Connection will be closed immediately (inside main IO loop) :param expected_client_id: :param raise_if_already_closed: :return:
643 def mark_client_connection_as_ready_to_be_closed(self, expected_client_id, raise_if_already_closed=True): 644 """ 645 Connection will be closed when all output will be sent (inside main IO loop). 646 :param expected_client_id: 647 :param raise_if_already_closed: 648 :return: 649 """ 650 if __debug__: 651 self._log('MARK EXPECTED CLIENT SOCKET AS READY TO BE CLOSED:') 652 expected_client_info = self._expected_clients[expected_client_id] 653 connection_info = expected_client_info._Client__connection 654 # connection_info = self._connections.get(expected_client_info.connection_id) 655 if raise_if_already_closed and (connection_info is None): 656 raise Exception('Expected client was not connected yet!') 657 self._mark_connection_as_ready_to_be_closed(connection_info)
Connection will be closed when all output will be sent (inside main IO loop). :param expected_client_id: :param raise_if_already_closed: :return:
659 def remove_client(self, expected_client_id): 660 if __debug__: 661 self._log('REMOVE EXPECTED CLIENT: {}'.format(expected_client_id)) 662 expected_client_info = self._expected_clients[expected_client_id] 663 if __debug__: 664 self._log('\tWITH KEYWORD: {}'.format(expected_client_info.connection_settings.keyword)) 665 connection_id = expected_client_info.connection_id 666 if connection_id is None: 667 self._remove_client(expected_client_id) 668 self.remove_connection(connection_id)
670 def add_connection(self, conn, address): 671 """ 672 :param conn: socket 673 :param address: address 674 :return: client ID 675 """ 676 if conn is None: 677 raise TypeError('conn should not be None!') 678 679 conn.setblocking(0) 680 if self.use_nodelay_inet and (conn.family in INET_TYPE_CONNECTIONS): 681 conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 682 683 new_client_id = self._connections_id_gen() 684 685 client_info = Connection(new_client_id, (conn, address), self) 686 self._connections[new_client_id] = client_info 687 self._connection_by_conn[conn] = new_client_id 688 self._input_check_sockets.add(conn) 689 self._exception_check_sockets.add(conn) 690 self._we_have_connections_for_select = True 691 692 self._unconfirmed_clients.add(conn) 693 694 return new_client_id
:param conn: socket :param address: address :return: client ID
696 def close_connection(self, connection_id): 697 if __debug__: 698 self._log('CLOSE CLIENT {}:'.format(connection_id)) 699 client_info = self._connections[connection_id] 700 if not client_info.conn.existence: 701 if __debug__: 702 self._log('CLIENT {} CONN IS NOT SET.'.format(connection_id)) 703 return 704 conn = client_info.conn.result 705 if conn is None: 706 if __debug__: 707 self._log('CLIENT {} CONN IS NONE.'.format(connection_id)) 708 return 709 710 conn.close() 711 client_info.conn.existence = False 712 client_info.output_to_client = copy.copy(client_info.output_to_client) # clear all output data to free some 713 # memory even before destroying 714 715 if connection_id in self._connections_marked_as_ready_to_be_closed: 716 self._connections_marked_as_ready_to_be_closed.remove(connection_id) 717 if conn in self._connections_marked_to_be_closed_immediately: 718 self._connections_marked_to_be_closed_immediately.remove(conn) 719 if conn in self._connection_by_conn: 720 del self._connection_by_conn[conn] 721 if conn in self._input_check_sockets: 722 self._input_check_sockets.remove(conn) 723 if conn in self._output_check_sockets: 724 self._output_check_sockets.remove(conn) 725 if conn in self._exception_check_sockets: 726 self._exception_check_sockets.remove(conn) 727 728 if not self._input_check_sockets: 729 self._we_have_connections_for_select = False 730 731 if __debug__: 732 self._log('CLIENT {} NORMALLY CLOSED.'.format(connection_id))
734 def close_connection_by_conn(self, conn): 735 # Если conn не в списке - вылетет ошибка. Это предотвратит ошибочное закрытие незарегистрированного сокета. 736 # И мы сможем обнаружить наличие соответствующей ошибки в коде. 737 if __debug__: 738 self._log('CLOSE CLIENT BY CONN: {}'.format(repr(conn))) 739 connection_id = self._connection_by_conn[conn] 740 if __debug__: 741 self._log('\t WITH CLIENT ID: {}'.format(connection_id)) 742 self.close_connection(connection_id)
744 def remove_connection(self, connection_id): 745 # client should NOT be removed immediately after connection close (close_connection): 746 # code should do it by itself after reading all available input data 747 if __debug__: 748 self._log('REMOVE CLIENT: {}'.format(connection_id)) 749 client_info = self._connections[connection_id] 750 if __debug__: 751 self._log('\tWITH KEYWORD: {}'.format(client_info.keyword)) 752 if client_info.conn.existence: 753 self.close_connection(connection_id) 754 conn = client_info.conn.result 755 if conn is None: 756 return 757 758 if conn in self._conns_of_unexpected_clients: 759 self._remove_unexpected_client(self._conns_of_unexpected_clients[conn]) 760 761 # if conn in self._conns_of_expected_clients: 762 # self._remove_client(self._conns_of_expected_clients[conn]) 763 if client_info.connected_expected_client_id is not None: 764 self._remove_client(client_info.connected_expected_client_id) 765 766 client_info.connected_expected_client_id = None 767 client_info.connected_expected_client = None 768 769 if connection_id in self._connections_marked_as_ready_to_be_deleted: 770 self._connections_marked_as_ready_to_be_deleted.remove(connection_id) 771 772 client_info.conn.existence = False 773 client_info.conn.result = None 774 775 del self._connections[connection_id] 776 if conn in self._connection_by_conn: 777 del self._connection_by_conn[conn] 778 if conn in self._input_check_sockets: 779 self._input_check_sockets.remove(conn) 780 if conn in self._output_check_sockets: 781 self._output_check_sockets.remove(conn) 782 if conn in self._exception_check_sockets: 783 self._exception_check_sockets.remove(conn) 784 785 # client_info.remove()
Inherited Members
- cengal.io.asock_io.versions.v_1.abstract.NetIOCallbacks
- on_accept_connection
- on_connected
- on_read
- on_write
- on_close
- cengal.io.asock_io.versions.v_1.base.ASockIOCoreMemoryManagement
- socket_read_fixed_buffer_size
- link_to
- cengal.io.asock_io.versions.v_1.base.IOCoreMemoryManagement
- global__data_size_limit
- global_in__data_size_limit
- global_in__data_full_size
- global_in__deletable_data_full_size
- global_out__data_size_limit
- global_out__data_full_size
- global_out__deletable_data_full_size
2191@contextmanager 2192def asock_io_core_connect(asock_io_core_obj: ASockIOCore, should_have_gate_connections: bool = False, backlog: int = 1): 2193 try: 2194 gate_connections_num = asock_io_core_obj.listen(backlog) 2195 if should_have_gate_connections and (not gate_connections_num): 2196 error_text = 'ERROR: CONTEXTMANAGER: BASIC INITIATION: THERE IS NO GOOD GATE CONNECTIONS!' 2197 asock_io_core_obj._log(error_text) 2198 raise Exception(error_text) 2199 else: 2200 print('THERE ARE CREATED {} GOOD GATE CONNECTIONS'.format(gate_connections_num)) 2201 yield asock_io_core_obj 2202 except: 2203 raise 2204 finally: 2205 asock_io_core_obj.io_iteration() 2206 asock_io_core_obj.destroy() 2207 asock_io_core_obj.io_iteration() 2208 if __debug__: 2209 print('RECV BUFF SIZES: {}'.format(str(asock_io_core_obj.recv_buff_sizes)[:150])) 2210 if __debug__: 2211 print('RECV SIZES: {}'.format(str(asock_io_core_obj.recv_sizes)[:150])) 2212 # print('RECV BUFF SIZES: {}'.format(asock_io_core_obj.recv_buff_sizes)) 2213 # print('RECV SIZES: {}'.format(asock_io_core_obj.recv_sizes))