cengal.io.asock_io.versions.v_0.tcp_app_server
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19""" 20Module Docstring 21Docstrings: http://www.python.org/dev/peps/pep-0257/ 22""" 23 24 25__author__ = "ButenkoMS <gtalk@butenkoms.space>" 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 28__license__ = "Apache License, Version 2.0" 29__version__ = "4.4.1" 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 31__email__ = "gtalk@butenkoms.space" 32# __status__ = "Prototype" 33__status__ = "Development" 34# __status__ = "Production" 35 36 37import os 38import sys 39import traceback 40import select 41import time 42# from fileinput import input 43from typing import Set, Iterable, Optional, Tuple, Dict 44import copy 45from collections import deque 46 47from cengal.data_generation.id_generator import IDGenerator, GeneratorType 48from .base import * 49 50from cengal.data_containers.dynamic_list_of_pieces import DynamicListOfPiecesDequeWithLengthControl 51from cengal.data_containers.fast_fifo import FIFODequeWithLengthControl, FIFOIsEmpty 52from cengal.data_manipulation.front_triggerable_variable import FrontTriggerableVariable, FrontTriggerableVariableType 53from cengal.code_flow_control.smart_values.versions.v_0 import ResultExistence 54from contextlib import contextmanager 55from cengal.code_inspection.line_profiling import set_profiler 56from .recv_buff_size_computer import RecvBuffSizeComputer 57from math import ceil 58 59""" 60Module Docstring 61Docstrings: http://www.python.org/dev/peps/pep-0257/ 62 63CAUTION: some code here is optimized for speed - not for readability or beauty. 64""" 65 66__author__ = 'ButenkoMS <gtalk@butenkoms.space>' 67 68# set_profiler(True) 69set_profiler(False) 70 71 72class IoIterationResult: 73 """ 74 ([1] подключившиеся ожидаемые клиенты (ОКл); [2] ОКл сокет которых был отключен по причине ошибки 75 (сами ОКл еще небыли удалены - удаление нужно инициировать явно); [3] ОКл имеет очередь непрочитанных принятых 76 сообщений; [4] размер очереди неотправленных сообщений ОКл меньше порогового, а значит в нее можно записывать 77 новые запросы (не уверен пока в надобности этого параметра. Скорее всего он не нужен: актор в большинстве 78 случаев блокируется при вызове IO-операции; кроме случаев когда был задействован ассинхронный интерфейс, 79 при котором актор отправляет запрос не требуя ответа об успешном окончании операции (без какого-либо контроля 80 успешности, или же с ручным контролем путем вызова спец-метода, который-бы и проводил проверку, или же 81 считывание имеющихся результатов операций))) 82 """ 83 84 def __init__(self): 85 self.newly_connected_expected_clients = set() 86 self.newly_connected_unknown_clients = set() 87 self.clients_with_disconnected_connection = set() 88 self.clients_have_data_to_read = set() 89 self.clients_with_empty_output_fifo = set() 90 91 def update(self, other): 92 self.newly_connected_expected_clients.update(other.newly_connected_expected_clients) 93 self.newly_connected_unknown_clients.update(other.newly_connected_unknown_clients) 94 self.clients_with_disconnected_connection.update( 95 other.clients_with_disconnected_connection) 96 self.clients_have_data_to_read.update(other.clients_have_data_to_read) 97 self.clients_with_empty_output_fifo.update(other.clients_with_empty_output_fifo) 98 99 def remove(self, item): 100 if item in self.newly_connected_expected_clients: 101 self.newly_connected_expected_clients.remove(item) 102 if item in self.newly_connected_unknown_clients: 103 self.newly_connected_unknown_clients.remove(item) 104 if item in self.clients_with_disconnected_connection: 105 self.clients_with_disconnected_connection.remove(item) 106 if item in self.clients_have_data_to_read: 107 self.clients_have_data_to_read.remove(item) 108 if item in self.clients_with_empty_output_fifo: 109 self.clients_with_empty_output_fifo.remove(item) 110 111 def clear(self): 112 self.newly_connected_expected_clients.clear() 113 self.newly_connected_unknown_clients.clear() 114 self.clients_with_disconnected_connection.clear() 115 self.clients_have_data_to_read.clear() 116 self.clients_with_empty_output_fifo.clear() 117 118 119class ASockIOCoreMemoryManagement(IOCoreMemoryManagement): 120 def __init__(self): 121 super(ASockIOCoreMemoryManagement, self).__init__() 122 123 self.socket_read_fixed_buffer_size = ResultExistence(True, 124 1024 ** 2) # 1024**2 is the fastest fixed read buffer. 125 126 def link_to(self, parent): 127 super(ASockIOCoreMemoryManagement, self).link_to(parent) 128 try: 129 self.socket_read_fixed_buffer_size = parent.socket_read_fixed_buffer_size 130 except AttributeError: 131 pass 132 133 134class Connection: 135 def __init__(self, 136 connection_id=None, 137 connection__conn_addr: tuple = None, 138 global_memory_management: ASockIOCoreMemoryManagement = None 139 ): 140 """ 141 142 :param connection_id: ID for this connection 143 :param connection__conn_addr: tuple(conn, addr) where conn is a socket, addr is an address 144 :param global_memory_management: global memory management obj 145 """ 146 self.id = connection_id 147 if connection__conn_addr is None: 148 self.conn = ResultExistence(False, None) 149 self.addr = ResultExistence(False, None) 150 else: 151 self.conn = ResultExistence(True, connection__conn_addr[0]) 152 self.addr = ResultExistence(True, connection__conn_addr[1]) 153 154 self.addr_info = None 155 self.host_names = None 156 157 self.recv_buff_size_computer = RecvBuffSizeComputer() 158 self.recv_buff_size = 0 159 self.calc_new_recv_buff_size(0) 160 161 self.should_be_closed = False # socket should be closed immediately. For example because of IO error. 162 self.ready_to_be_closed = False # socket should be closed, after all messages had been sent to client. 163 self.ready_for_deletion = False # connection should be deleted immediately. For example because of unexpected 164 # keyword. 165 166 self.keyword = None 167 168 self.raw_input_from_client = DynamicListOfPiecesDequeWithLengthControl( 169 external_data_length=global_memory_management.global_in__data_full_size) 170 self.current_message_length = None # length of current input message (or None, if size waw not read yet) 171 self.input_from_client = FIFODequeWithLengthControl( 172 external_data_full_size=global_memory_management.global_in__data_full_size) 173 self.current_memoryview_output = None 174 self.current_memoryview_input = None 175 self.output_to_client = FIFODequeWithLengthControl( 176 external_data_full_size=global_memory_management.global_out__data_full_size) 177 178 self.this_is_raw_connection = False 179 180 self.connected_expected_client_id = None 181 self.connected_expected_client = None 182 self.has_inline_processor = False 183 184 def calc_new_recv_buff_size(self, last_recv_amount): 185 self.recv_buff_size = self.recv_buff_size_computer.calc_new_recv_buff_size(last_recv_amount) 186 187 def remove(self): 188 self.raw_input_from_client.remove() 189 self.input_from_client.remove() 190 self.output_to_client.remove() 191 192 193class InlineProcessor: 194 __slots__ = ('client_id', 'keyword', 'socket_family', 'socket_type', 'socket_proto', 'addr_info', 'host_names', 195 'is_in_raw_mode', '__set__is_in_raw_mode', '__set__mark_socket_as_should_be_closed_immediately', 196 '__set__mark_socket_as_ready_to_be_closed', '__external_parameters_set_trigger', 'output_messages', 197 '__hold__client_id') 198 199 def __init__(self, client_id=None, keyword: bytes = None, socket_family=None, socket_type=None, socket_proto=None, 200 addr_info=None, host_names=None, external_parameters_set_trigger: Set = None): 201 """ 202 203 :param keyword: client keyword. You may check for a known keywords to act appropriately 204 :param socket_family: 205 :param socket_type: 206 :param socket_proto: 207 :param addr_info: result of socket.getaddrinfo() call 208 :param host_names: result of socket.gethostbyaddr() call 209 """ 210 self.client_id = client_id 211 self.keyword = keyword 212 self.socket_family = socket_family 213 self.socket_type = socket_type 214 self.socket_proto = socket_proto 215 self.addr_info = addr_info 216 self.host_names = host_names 217 self.is_in_raw_mode = None 218 self.__hold__client_id = client_id 219 self.__set__is_in_raw_mode = False 220 self.__set__mark_socket_as_should_be_closed_immediately = False 221 self.__set__mark_socket_as_ready_to_be_closed = False 222 self.__external_parameters_set_trigger = external_parameters_set_trigger 223 224 # self.output_messages = FIFODequeWithLengthControl() 225 self.output_messages = deque() 226 # self.output_messages = list() 227 228 def on__data_received(self, data: bytes): 229 """ 230 Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data 231 Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will 232 be logged 233 :param data: piece of input data if connection is in RAW-mode and full message otherwise. 234 """ 235 pass 236 237 def on__output_buffers_are_empty(self): 238 """ 239 Will be called immediately when all output data was send. 240 Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data 241 Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will 242 be logged 243 """ 244 pass 245 246 def on__connection_lost(self): 247 """ 248 Will be called after connection was closed. Current Inline Processor object will be destroyed after this call. 249 Situation with unhandled exception will be logged. 250 """ 251 pass 252 253 def set__is_in_raw_mode(self, is_in_raw_mode: bool): 254 self.__set__is_in_raw_mode = is_in_raw_mode 255 self.__external_parameters_set_trigger.add(self.__hold__client_id) 256 257 def mark__socket_as_should_be_closed_immediately(self, mark_socket_as: bool): 258 self.__set__mark_socket_as_should_be_closed_immediately = mark_socket_as 259 self.__external_parameters_set_trigger.add(self.__hold__client_id) 260 261 def mark__socket_as_ready_to_be_closed(self, mark_socket_as: bool): 262 self.__set__mark_socket_as_ready_to_be_closed = mark_socket_as 263 self.__external_parameters_set_trigger.add(self.__hold__client_id) 264 265 266class Client: 267 def __init__(self, connection_settings: ConnectionSettings, client_id=None, client_tcp_id=None): 268 """ 269 270 :param client_id: ID of the expected client 271 :param client_tcp_id: ID of the connection 272 :param connection_settings: useful ConnectionSettings parameters are {direction_role, keyword} - for a client, 273 and all - for the super server. 274 """ 275 self.id = client_id 276 self.connection_id = client_tcp_id 277 self.__connection = None # type: Optional[Connection] 278 self.connection_settings = connection_settings 279 self.connection_settings.check() 280 281 self.will_use_raw_client_connection = False 282 self.will_use_raw_connection_without_handshake = False 283 self.this_is_unknown_client = False 284 285 self.obj_for_inline_processing = None 286 287 def get_connection(self)->Connection: 288 return self.__connection 289 290 291class ASockIOCore(ASockIOCoreMemoryManagement): 292 def __init__(self, gates_connections_settings: Set[ConnectionSettings]): 293 """ 294 Port should not be open to a external world! 295 :param gates_connections_settings: set() of ConnectionSettings() 296 :return: 297 """ 298 super(ASockIOCore, self).__init__() 299 300 if os.name != 'nt': 301 self.po = select.poll() 302 self.last_all_sockets = set() # type: Set[int] 303 self.socket_by_fd = dict() # type: Dict[int, socket.socket] 304 305 self.check_sockets_sum_time = 0.0 306 self.check_sockets_qnt = 0 307 self.check_sockets_max_time = 0.0 308 309 self.gates_connections_settings = gates_connections_settings 310 if not self.gates_connections_settings: 311 self.gates_connections_settings = set() 312 # raise Exception('gates_connections_settings should be provided!') 313 for gates_connections_settings in self.gates_connections_settings: 314 gates_connections_settings.check() 315 self.faulty_connection_settings = set() 316 self._connection_settings_by_gate_conn = dict() 317 318 self.set_of_gate_addresses = set() 319 self._gate = set() 320 self.reuse_gate_addr = False 321 self.reuse_gate_port = False 322 323 self.message_size_len = MESSAGE_SIZE_LEN 324 self.server_answer__keyword_accepted = SERVER_ANSWER__KEYWORD_ACCEPTED 325 326 self._connections = dict() # key: ID; data: Connection() 327 self._connection_by_conn = dict() # key: conn; data: ID 328 self._connections_id_gen = IDGenerator() 329 330 self._connections_marked_as_ready_to_be_closed = set() 331 self._connections_marked_to_be_closed_immediately = set() 332 self._connections_marked_as_ready_to_be_deleted = set() 333 334 self._unconfirmed_clients = set() 335 336 self._we_have_connections_for_select = False 337 self._input_check_sockets = set() 338 self._output_check_sockets = set() 339 self._exception_check_sockets = set() 340 341 # ID (GUID) и другая информация клиентов, подключение которых явно ожидается 342 self._expected_clients = dict() # key: ID; data: Client() 343 self._expected_clients_id_gen = IDGenerator() 344 self._keywords_for_expected_clients = dict() # key: keyword; data: info 345 self._conns_of_expected_clients = dict() # key: conn; data expected_client_ID 346 347 self.unexpected_clients_are_allowed = True 348 349 # Список неопознанных и неожидаемых клиентов. Если клиент выдал свой GUID и позже кто-то добавил этот GUID в 350 # список ожидаемых клиентов - данный клиент будет автоматически подхвачен. 351 self._unexpected_clients = dict() # key: ID; data: Client() 352 self._unexpected_clients_id_gen = IDGenerator() 353 self._keywords_of_unexpected_clients = dict() 354 self._conns_of_unexpected_clients = dict() 355 356 self._io_iteration_result = IoIterationResult() 357 358 self.raw_checker_for_new_incoming_connections = CheckIsRawConnection() 359 self.need_to_auto_check_incoming_raw_connection = False 360 self.unknown_clients_are_allowed = False 361 self._unknown_clients_keyword_gen = IDGenerator(GeneratorType.guid_string) 362 self.prefix_for_unknown_client_keywords = b'UNKNOWN CLIENT: ' 363 364 self.echo_log = False 365 self._internal_log = deque() 366 367 self.recv_sizes = deque() 368 self.recv_buff_sizes = deque() 369 370 self.should_get_client_addr_info_on_connection = True 371 372 self.use_nodelay_inet = False 373 self.use_speed_optimized_socket_read = False 374 375 self.show_inform_about_accept_stop_because_of_all_buffers_size_limit = \ 376 FrontTriggerableVariable(FrontTriggerableVariableType.equal, True) 377 self.show_inform_about_read_stop_because_of_in_buffer_size_limit = \ 378 FrontTriggerableVariable(FrontTriggerableVariableType.equal, True) 379 self.show_inform_about_work_stop_because_of_out_buffer_size_limit = \ 380 FrontTriggerableVariable(FrontTriggerableVariableType.equal, True) 381 382 self.class_for_unknown_clients_inline_processing = None 383 384 self._clients_with_inline_processors_that_need_to_apply_parameters = set() 385 386 @staticmethod 387 def check_sockets_select(read: Set[int], write: Set[int], error: Set[int], 388 timeout: float)->Tuple[Set[int], Set[int], Set[int]]: 389 all_sockets = read | write | error 390 if all_sockets: 391 return select.select(read, 392 write, 393 error, 394 timeout) 395 else: 396 return set(), set(), set() 397 398 def check_sockets_poll(self, read: Set[int], write: Set[int], error: Set[int], 399 timeout: float)->Tuple[Set[int], Set[int], Set[int]]: 400 read_events = select.POLLIN | select.POLLPRI 401 write_events = select.POLLOUT 402 except_events = select.POLLERR | select.POLLHUP | select.POLLNVAL 403 if hasattr(select, 'POLLRDHUP'): 404 except_events |= select.POLLRDHUP 405 readable_events = {select.POLLIN, select.POLLPRI} 406 writable_events = {select.POLLOUT} 407 exceptional_events = {select.POLLERR, select.POLLHUP, select.POLLNVAL} 408 if hasattr(select, 'POLLRDHUP'): 409 exceptional_events.add(select.POLLRDHUP) 410 all_events_set = readable_events | writable_events | exceptional_events 411 412 timeout = int(timeout * 1000) 413 414 # print('>>> POLL {}: last_all_sockets: {}'.format(time.perf_counter(), self.last_all_sockets)) 415 all_sockets = read | write | error 416 # print('>>> POLL {}: all_sockets: {}'.format(time.perf_counter(), all_sockets)) 417 new_sockets = all_sockets - self.last_all_sockets 418 # print('>>> POLL {}: new_sockets: {}'.format(time.perf_counter(), new_sockets)) 419 still_sockets = all_sockets & self.last_all_sockets 420 # print('>>> POLL {}: still_sockets: {}'.format(time.perf_counter(), still_sockets)) 421 deleted_sockets = self.last_all_sockets - all_sockets 422 # print('>>> POLL {}: deleted_sockets: {}'.format(time.perf_counter(), deleted_sockets)) 423 self.last_all_sockets = all_sockets 424 425 for socket_fd in new_sockets: 426 event_mask = 0 427 if socket_fd in read: 428 event_mask |= read_events 429 if socket_fd in write: 430 event_mask |= write_events 431 if socket_fd in error: 432 event_mask |= except_events 433 # print('>>> POLL {}: new_socket: {}; event_mask: {}'.format(time.perf_counter(), socket_fd, event_mask)) 434 self.po.register(socket_fd, event_mask) 435 436 for socket_fd in still_sockets: 437 event_mask = 0 438 if socket_fd in read: 439 event_mask |= read_events 440 if socket_fd in write: 441 event_mask |= write_events 442 if socket_fd in error: 443 event_mask |= except_events 444 # print('>>> POLL {}: still_socket: {}; event_mask: {}'.format(time.perf_counter(), socket_fd, event_mask)) 445 self.po.modify(socket_fd, event_mask) 446 447 for socket_fd in deleted_sockets: 448 # print('>>> POLL {}: deleted_socket: {}'.format(time.perf_counter(), socket_fd)) 449 self.po.unregister(socket_fd) 450 451 poll_result = self.po.poll(timeout) 452 # print('>>> POLL {}: result: {}'.format(time.perf_counter(), poll_result)) 453 # sys.stdout.flush() 454 455 readable = set() 456 writable = set() 457 exceptional = set() 458 for socket_fd, event_mask in poll_result: 459 socket_events_set = set() 460 for another_event in all_events_set: 461 if event_mask & another_event: 462 socket_events_set.add(another_event) 463 464 if socket_events_set & readable_events: 465 readable.add(socket_fd) 466 if socket_events_set & writable_events: 467 writable.add(socket_fd) 468 if socket_events_set & exceptional_events: 469 exceptional.add(socket_fd) 470 471 return readable, writable, exceptional 472 473 def check_sockets(self, read: Set[socket.socket], write: Set[socket.socket], error: Set[socket.socket], 474 timeout: float)->Tuple[Set[socket.socket], Set[socket.socket], Set[socket.socket]]: 475 all_sockets = read | write | error 476 if all_sockets: 477 read_fd = set() 478 write_fd = set() 479 error_fd = set() 480 for conn in read: 481 read_fd.add(conn.fileno()) 482 for conn in write: 483 write_fd.add(conn.fileno()) 484 for conn in error: 485 error_fd.add(conn.fileno()) 486 487 check_sockets = self.check_sockets_select 488 if os.name != 'nt': 489 check_sockets = self.check_sockets_poll 490 491 readable_fd, writable_fd, exceptional_fd = check_sockets(read_fd, 492 write_fd, 493 error_fd, 494 timeout) 495 readable = set() 496 writable = set() 497 exceptional = set() 498 for fd in readable_fd: 499 readable.add(self.socket_by_fd[fd]) 500 for fd in writable_fd: 501 writable.add(self.socket_by_fd[fd]) 502 for fd in exceptional_fd: 503 exceptional.add(self.socket_by_fd[fd]) 504 return readable, writable, exceptional 505 else: 506 return set(), set(), set() 507 508 def gate_io_iteration(self, timeout=0.0): 509 result = self._io_iteration_result 510 if self._gate: 511 readable, writable, exceptional = self.check_sockets_select(self._gate, 512 set(), 513 set(), 514 timeout) 515 516 # Handle inputs 517 for s in readable: 518 self._read_data_from_socket(s) 519 520 self._io_iteration_result = IoIterationResult() 521 return result 522 523 # @profile 524 def io_iteration(self, timeout=0.0): 525 """ 526 527 :param timeout: timeout in seconds 528 :return: 529 """ 530 result = self._io_iteration_result 531 532 if self._we_have_connections_for_select: 533 # need_to_process = False 534 # all_sockets = self._input_check_sockets | self._output_check_sockets | self._exception_check_sockets 535 # if not (all_sockets - self._gate): 536 # timeout = 0.01 537 538 need_to_repeat = True 539 540 while need_to_repeat: 541 output_check_sockets = set() 542 543 # Is need to check writable sockets 544 need_to_check_writable_sockets = False 545 for s in self._output_check_sockets: 546 curr_client_info = self._connections[self._connection_by_conn[s]] 547 if curr_client_info.output_to_client.size(): 548 need_to_check_writable_sockets = True 549 break 550 551 if need_to_check_writable_sockets: 552 output_check_sockets = self._output_check_sockets 553 554 # print('>>> POLL {}: ri: {}, wi: {}, ei: {}'.format(time.perf_counter(), 555 # len(self._input_check_sockets), 556 # len(self._output_check_sockets), 557 # len(self._exception_check_sockets))) 558 # sys.stdout.flush() 559 check_sockets_start_time = time.perf_counter() 560 readable, writable, exceptional = self.check_sockets(self._input_check_sockets, 561 output_check_sockets, 562 self._exception_check_sockets, 563 timeout) 564 check_sockets_finish_time = time.perf_counter() 565 check_sockets_delta_time = check_sockets_finish_time - check_sockets_start_time 566 self.check_sockets_sum_time += check_sockets_delta_time 567 self.check_sockets_qnt += 1 568 if self.check_sockets_max_time < check_sockets_delta_time: 569 self.check_sockets_max_time = check_sockets_delta_time 570 check_socket_average_time = self.check_sockets_sum_time / self.check_sockets_qnt 571 # print('>>> CHECK SOCKET: DELTA {}: AVG: {}; SUM: {}; MAX: {}'.format( 572 # check_sockets_delta_time, 573 # check_socket_average_time, 574 # self.check_sockets_sum_time, 575 # self.check_sockets_max_time 576 # )) 577 # print('>>> POLL {}: ro: {}, wo: {}, eo: {}'.format(time.perf_counter(), 578 # len(readable), 579 # len(writable), 580 # len(exceptional))) 581 # sys.stdout.flush() 582 583 read_is_forbidden = True 584 if (self.global_in__data_full_size.result - self.global_in__deletable_data_full_size.result) \ 585 <= self.global_in__data_size_limit.result: 586 read_is_forbidden = False 587 588 # Handle inputs 589 for s in readable: 590 read_result = self._read_data_from_socket(s) 591 if read_result: 592 if s in self._unconfirmed_clients: 593 self._process_client_keyword(s) 594 self._check_is_client_have_data_to_read_in_fifo(s) 595 else: 596 self._client_have_data_to_read_in_fifo(s) 597 598 if __debug__: 599 read_is_forbidden_test = self.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger( 600 read_is_forbidden) 601 if read_is_forbidden_test is not None: 602 if read_is_forbidden_test: 603 print('Read is suppressed until data will be processed.') 604 else: 605 print('Read is allowed: data is processed.') 606 607 # Handle outputs 608 for s in writable: 609 curr_client_info = self._connections[self._connection_by_conn[s]] 610 self._write_data_to_socket(curr_client_info) 611 # self._write_data_to_socket(s) 612 613 # Handle "exceptional conditions" 614 for s in exceptional: 615 self._handle_connection_error(s) 616 617 # Set parameters for inline processors 618 if self._clients_with_inline_processors_that_need_to_apply_parameters: 619 for ec_id in self._clients_with_inline_processors_that_need_to_apply_parameters: 620 expected_client_info = self._expected_clients[ec_id] 621 connection_info = expected_client_info._Client__connection 622 self._inline_processor__apply_parameters(connection_info, expected_client_info) 623 self._clients_with_inline_processors_that_need_to_apply_parameters.clear() 624 625 # Close sockets 626 if self._connections_marked_to_be_closed_immediately: 627 sockets_should_be_closed_immediately = self._connections_marked_to_be_closed_immediately 628 self._connections_marked_to_be_closed_immediately = set() 629 for closeable_socket in sockets_should_be_closed_immediately: 630 connection_id = self._connection_by_conn[closeable_socket] 631 # self.close_connection_by_conn(closeable_socket) 632 self.close_connection(connection_id) 633 self._inline_processor__on__connection_lost_by_connection_id(connection_id) 634 635 # Removing clients 636 if self._connections_marked_as_ready_to_be_deleted: 637 clients_ready_to_be_deleted = self._connections_marked_as_ready_to_be_deleted 638 self._connections_marked_as_ready_to_be_deleted = set() 639 for faulty_socket in clients_ready_to_be_deleted: 640 self.remove_connection_by_conn(faulty_socket) 641 642 if (self.global_out__data_full_size.result - self.global_out__deletable_data_full_size.result) \ 643 <= self.global_out__data_size_limit.result: 644 need_to_repeat = False 645 else: 646 need_to_repeat = True 647 648 need_to_repeat = False 649 650 if __debug__: 651 need_to_repeat_show = self.show_inform_about_work_stop_because_of_out_buffer_size_limit.test_trigger( 652 need_to_repeat) 653 if need_to_repeat_show is not None: 654 if need_to_repeat_show: 655 print('Work is suppressed until data will be out.') 656 else: 657 print('Work is allowed: data is out.') 658 659 self._io_iteration_result = IoIterationResult() 660 return result 661 662 def listen(self, backlog=1): 663 # backlog = backlog or 1 664 665 new_connection_settings = set() 666 for gate_connection_settings in self.gates_connections_settings: 667 gate = None 668 try: 669 gate = socket.socket(gate_connection_settings.socket_family, gate_connection_settings.socket_type, 670 gate_connection_settings.socket_protocol, gate_connection_settings.socket_fileno) 671 self.socket_by_fd[gate.fileno()] = gate 672 gate.setblocking(0) 673 # gate.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 674 except (socket.error, OSError) as err: 675 gate = None 676 if __debug__: self._log('EXCEPTION: GATE: LISTEN: CREATE SOCKET: {}, {}'.format( 677 err.errno, err.strerror)) 678 continue 679 680 if self.reuse_gate_port: 681 gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 682 683 try: 684 self._check_for_initial_af_unix_socket_unlink(gate_connection_settings) 685 gate.bind(gate_connection_settings.socket_address) 686 except (socket.error, OSError) as err: 687 del self.socket_by_fd[gate.fileno()] 688 gate.close() 689 gate = None 690 if __debug__: self._log('EXCEPTION: GATE: BIND:"{}", {}, {}'.format( 691 gate_connection_settings.socket_address, err.errno, err.strerror)) 692 continue 693 try: 694 gate.listen(backlog) 695 except (socket.error, OSError) as err: 696 del self.socket_by_fd[gate.fileno()] 697 gate.close() 698 gate = None 699 if __debug__: self._log('EXCEPTION: GATE: LISTEN:"{}", {}, {}'.format( 700 gate_connection_settings.socket_address, err.errno, err.strerror)) 701 continue 702 703 if self.reuse_gate_addr: 704 gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 705 706 self._input_check_sockets.add(gate) 707 self._exception_check_sockets.add(gate) 708 709 if gate: 710 self._gate.add(gate) 711 if gate_connection_settings.socket_family in INET_TYPE_CONNECTIONS: 712 self.set_of_gate_addresses.add(gate_connection_settings.socket_address[0]) 713 elif socket.AF_UNIX == gate_connection_settings.socket_family: 714 self.set_of_gate_addresses.add(gate_connection_settings.socket_address) 715 else: 716 self.set_of_gate_addresses.add(gate_connection_settings.socket_address) 717 self._log('WARNING: GATE: SAVE CONNECTION ADDRESS: UNKNOWN SOCKET FAMILY') 718 self._connection_settings_by_gate_conn[gate] = gate_connection_settings 719 self._we_have_connections_for_select = True 720 new_connection_settings.add(gate_connection_settings) 721 else: 722 self.faulty_connection_settings.add(gate_connection_settings) 723 self.gates_connections_settings = new_connection_settings 724 725 return len(self.gates_connections_settings) 726 727 def close_all_connections(self): 728 if __debug__: self._log('CLOSE ALL CONNECTIONS:') 729 clients_list = dict(self._connections) 730 for connection_id, client_info in clients_list.items(): 731 self.close_connection(connection_id) 732 733 def remove_all_connections(self): 734 clients_list = dict(self._connections) 735 for connection_id, client_info in clients_list.items(): 736 self.remove_connection(connection_id) 737 738 def close(self): 739 for gate in self._gate: 740 del self.socket_by_fd[gate.fileno()] 741 gate.close() 742 743 if gate in self._input_check_sockets: 744 self._input_check_sockets.remove(gate) 745 if gate in self._exception_check_sockets: 746 self._exception_check_sockets.remove(gate) 747 748 if not self._input_check_sockets: 749 self._we_have_connections_for_select = False 750 self._unlink_good_af_unix_sockets() 751 752 def destroy(self): 753 self.close() 754 self.close_all_connections() 755 self.remove_all_connections() 756 757 def add_client(self, expected_client_info: Client): 758 """ 759 Добавляет новый expected client в список. Это может быть как клиент (который сам подключился или подключится в 760 будущем), так и супер-сервер, попытка подключения к которому будет осуществлена тут же - на месте. 761 При этом если произойдет какая-либо ошибка при подключении к супер-серверу - expected client не будет 762 зарегистрирован. Однако client может быть создан. В случае ошибки он будет помечен для закрытия и удаления. 763 Поэтому исключения нужно перехватывать, и после этого проводить как минимум один (как минимум завершающий - 764 перед закрытием и уничтожением сервера) цикл обработки io_iteration(). 765 :param expected_client_info: link to Client() 766 :return: expected_client_id 767 """ 768 if (expected_client_info.connection_settings.keyword is None) \ 769 and (ConnectionDirectionRole.client == expected_client_info.connection_settings.direction_role): 770 raise Exception('Keyword in Client.connection_settings should not be None for a Client connection!') 771 772 if expected_client_info.connection_settings.keyword in self._keywords_for_expected_clients: 773 raise Exception('Expected Client with keyword "{}" is already registered!'.format( 774 expected_client_info.connection_settings.keyword)) 775 776 expected_client_info.id = self._expected_clients_id_gen() 777 778 if self.unexpected_clients_are_allowed: 779 if expected_client_info.connection_settings.keyword in self._keywords_of_unexpected_clients: 780 # клиент уже подключен 781 unexpected_client_id = self._keywords_of_unexpected_clients[ 782 expected_client_info.connection_settings.keyword] 783 unexpected_client_info = self._unexpected_clients[unexpected_client_id] 784 connection_info = expected_client_info._Client__connection 785 if ( 786 unexpected_client_info.connection_settings.direction_role == expected_client_info.connection_settings.direction_role) and \ 787 (ConnectionDirectionRole.client == unexpected_client_info.connection_settings.direction_role): 788 # Произошел запрос на подключение к клиенту, и клиент с таким же ключевым словом уже 789 # подключен (с сервером этого быть не должно, и может произойти только при неверном ручном изменении 790 # внутренних данных объекта класса ASockIOCore). Необходимо переиспользовать уже имеющееся 791 # подключение. 792 # В случае если все же тут оказался соккет подключенный к супер-серверу - он будет автоматически 793 # отключен и соединение будет установлено в новом сокете. 794 expected_client_info.connection_id = unexpected_client_info.connection_id 795 expected_client_info._Client__connection = \ 796 unexpected_client_info._Client__connection 797 self._conns_of_expected_clients[connection_info.conn.result] = expected_client_info.id 798 connection_info.connected_expected_client_id = expected_client_info.id 799 connection_info.connected_expected_client = expected_client_info 800 self._io_iteration_result.newly_connected_expected_clients.add(expected_client_info.id) 801 else: 802 # Произошел запрос на подключение к супер-серверу, но клиент с таким же ключевым словом уже 803 # подключен (или наоборот). Или же просто имеется установленное соединение с супер-сервером. 804 # Необходимо его отключить. 805 self._mark_connection_to_be_closed_immediately(connection_info) 806 self._mark_connection_as_ready_for_deletion(connection_info) 807 self._remove_unexpected_client(unexpected_client_id) 808 else: 809 # клиент еще не подключен 810 expected_client_info.connection_id = None 811 expected_client_info._Client__connection = None 812 813 if ConnectionDirectionRole.server == expected_client_info.connection_settings.direction_role: 814 self._connect_to_super_server(expected_client_info) 815 816 self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword] = expected_client_info.id 817 self._expected_clients[expected_client_info.id] = expected_client_info 818 819 return expected_client_info.id 820 821 def get_client_id_by_keyword(self, expected_client_keyword): 822 """ 823 :param expected_client_keyword: expected_client_keyword 824 :return: link to Client() 825 """ 826 return self._keywords_for_expected_clients[expected_client_keyword] 827 828 def get_client_info(self, expected_client_id): 829 """ 830 :param expected_client_id: expected_client_id 831 :return: link to Client() 832 """ 833 return self._expected_clients[expected_client_id] 834 835 def get_connection_input_fifo_size_for_client(self, expected_client_id): 836 expected_client_info = self._expected_clients[expected_client_id] 837 connection_info = expected_client_info._Client__connection 838 if connection_info is None: 839 raise Exception('Expected client was not connected yet!') 840 # if client_info.this_is_raw_connection: 841 # if client_info.input_from_client.size(): 842 # return 1 843 # else: 844 # return 0 845 # else: 846 # return client_info.input_from_client.size() 847 return connection_info.input_from_client.size() 848 849 def get_message_from_client(self, expected_client_id): 850 expected_client_info = self._expected_clients[expected_client_id] 851 connection_info = expected_client_info._Client__connection 852 if connection_info is None: 853 raise Exception('Expected client was not connected yet!') 854 if not connection_info.input_from_client.size(): 855 raise Exception('There is no readable data in expected client\'s FIFO!') 856 # if client_info.this_is_raw_connection: 857 # self._consolidate_raw_messages_in_input_from_client_fifo(client_info) 858 return connection_info.input_from_client.get() 859 860 # @profile 861 def get_messages_from_client(self, expected_client_id): 862 expected_client_info = self._expected_clients[expected_client_id] 863 connection_info = expected_client_info._Client__connection 864 if connection_info is None: 865 raise Exception('Expected client was not connected yet!') 866 try: 867 while True: 868 yield connection_info.input_from_client.get() 869 except FIFOIsEmpty: 870 pass 871 # while client_info.input_from_client.size(): 872 # yield client_info.input_from_client.get() 873 874 def get_connection_output_fifo_size_for_client(self, expected_client_id): 875 expected_client_info = self._expected_clients[expected_client_id] 876 connection_info = expected_client_info._Client__connection 877 if connection_info is None: 878 raise Exception('Expected client was not connected yet!') 879 return connection_info.output_to_client.size() 880 881 def send_message_to_client(self, expected_client_id, data): 882 # data = bytes(data) 883 expected_client_info = self._expected_clients[expected_client_id] 884 connection_info = expected_client_info._Client__connection 885 if connection_info is None: 886 raise Exception('Expected client was not connected yet!') 887 if connection_info.this_is_raw_connection: 888 self._send_message_through_connection_raw(connection_info, data) 889 else: 890 self._send_message_through_connection(connection_info, data) 891 892 def send_messages_to_client(self, expected_client_id, messages_list): 893 # data = bytes(data) 894 expected_client_info = self._expected_clients[expected_client_id] 895 connection_info = expected_client_info._Client__connection 896 if connection_info is None: 897 raise Exception('Expected client was not connected yet!') 898 if connection_info.this_is_raw_connection: 899 self._send_messages_through_connection_raw(connection_info, messages_list) 900 else: 901 self._send_messages_through_connection(connection_info, messages_list) 902 903 def check_is_client_is_in_raw_connection_mode(self, expected_client_id): 904 expected_client_info = self._expected_clients[expected_client_id] 905 connection_info = expected_client_info._Client__connection 906 if connection_info is None: 907 raise Exception('Expected client was not connected yet!') 908 return connection_info.this_is_raw_connection 909 910 def switch_client_raw_connection_mode(self, expected_client_id, is_raw: bool): 911 expected_client_info = self._expected_clients[expected_client_id] 912 connection_info = expected_client_info._Client__connection 913 if connection_info is None: 914 raise Exception('Expected client was not connected yet!') 915 connection_info.this_is_raw_connection = is_raw 916 917 def set_inline_processor_for_client(self, expected_client_id, 918 class_for_unknown_clients_inline_processing: type): 919 expected_client_info = self._expected_clients[expected_client_id] 920 connection_info = expected_client_info._Client__connection 921 if connection_info is None: 922 raise Exception('Expected client was not connected yet!') 923 self._set_inline_processor_for_client(connection_info, expected_client_info, 924 class_for_unknown_clients_inline_processing) 925 926 def close_client_connection(self, expected_client_id, raise_if_already_closed=True): 927 """ 928 Connection will be closed immediately (inside this method) 929 :param expected_client_id: 930 :param raise_if_already_closed: 931 :return: 932 """ 933 if __debug__: self._log('CLOSE EXPECTED CLIENT SOCKET:') 934 expected_client_info = self._expected_clients[expected_client_id] 935 connection_info = expected_client_info._Client__connection 936 if raise_if_already_closed and (connection_info is None): 937 raise Exception('Expected client was not connected yet!') 938 self.close_connection(connection_info.id) 939 940 def mark_client_connection_as_should_be_closed_immediately(self, expected_client_id, 941 raise_if_already_closed=True): 942 """ 943 Connection will be closed immediately (inside main IO loop) 944 :param expected_client_id: 945 :param raise_if_already_closed: 946 :return: 947 """ 948 if __debug__: self._log('MARK EXPECTED CLIENT SOCKET AS SHOULD BE CLOSED IMMEDIATELY:') 949 expected_client_info = self._expected_clients[expected_client_id] 950 connection_info = expected_client_info._Client__connection 951 if raise_if_already_closed and (connection_info is None): 952 raise Exception('Expected client was not connected yet!') 953 self._mark_connection_to_be_closed_immediately(connection_info) 954 955 def mark_client_connection_as_ready_to_be_closed(self, expected_client_id, raise_if_already_closed=True): 956 """ 957 Connection will be closed when all output will be sent (inside main IO loop). 958 :param expected_client_id: 959 :param raise_if_already_closed: 960 :return: 961 """ 962 if __debug__: self._log('MARK EXPECTED CLIENT SOCKET AS READY TO BE CLOSED:') 963 expected_client_info = self._expected_clients[expected_client_id] 964 connection_info = expected_client_info._Client__connection 965 if raise_if_already_closed and (connection_info is None): 966 raise Exception('Expected client was not connected yet!') 967 self._mark_connection_as_ready_to_be_closed(connection_info) 968 969 def remove_client(self, expected_client_id): 970 if __debug__: self._log('REMOVE EXPECTED CLIENT: {}'.format(expected_client_id)) 971 expected_client_info = self._expected_clients[expected_client_id] 972 if __debug__: self._log('\tWITH KEYWORD: {}'.format(expected_client_info.connection_settings.keyword)) 973 connection_id = expected_client_info.connection_id 974 if connection_id is None: 975 self._remove_client(expected_client_id) 976 self.remove_connection(connection_id) 977 978 def add_connection(self, conn, address): 979 """ 980 :param conn: socket 981 :param address: address 982 :return: client ID 983 """ 984 if conn is None: 985 raise TypeError('conn should not be None!') 986 987 self.socket_by_fd[conn.fileno()] = conn 988 conn.setblocking(0) 989 if self.use_nodelay_inet and (conn.family in INET_TYPE_CONNECTIONS): 990 conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 991 992 new_client_id = self._connections_id_gen() 993 994 client_info = Connection(new_client_id, (conn, address), self) 995 self._connections[new_client_id] = client_info 996 self._connection_by_conn[conn] = new_client_id 997 self._input_check_sockets.add(conn) 998 self._exception_check_sockets.add(conn) 999 self._we_have_connections_for_select = True 1000 1001 self._unconfirmed_clients.add(conn) 1002 1003 return new_client_id 1004 1005 def check_connection_existance(self, connection_id): 1006 if connection_id not in self._expected_clients: 1007 return False 1008 if connection_id not in self._connections: 1009 return False 1010 client_info = self._connections[connection_id] 1011 if not client_info.conn.existence: 1012 return False 1013 conn = client_info.conn.result 1014 if conn is None: 1015 return False 1016 return True 1017 1018 def close_connection(self, connection_id): 1019 if __debug__: self._log('CLOSE CLIENT {}:'.format(connection_id)) 1020 client_info = self._connections[connection_id] 1021 if not client_info.conn.existence: 1022 if __debug__: self._log('CLIENT {} CONN IS NOT SET.'.format(connection_id)) 1023 return 1024 conn = client_info.conn.result 1025 if conn is None: 1026 if __debug__: self._log('CLIENT {} CONN IS NONE.'.format(connection_id)) 1027 return 1028 1029 del self.socket_by_fd[conn.fileno()] 1030 conn.close() 1031 client_info.conn.existence = False 1032 client_info.output_to_client = copy.copy(client_info.output_to_client) # clear all output data to free some 1033 # memory even before destroying 1034 1035 if connection_id in self._connections_marked_as_ready_to_be_closed: 1036 self._connections_marked_as_ready_to_be_closed.remove(connection_id) 1037 if conn in self._connections_marked_to_be_closed_immediately: 1038 self._connections_marked_to_be_closed_immediately.remove(conn) 1039 if conn in self._connection_by_conn: 1040 del self._connection_by_conn[conn] 1041 if conn in self._input_check_sockets: 1042 self._input_check_sockets.remove(conn) 1043 if conn in self._output_check_sockets: 1044 self._output_check_sockets.remove(conn) 1045 if conn in self._exception_check_sockets: 1046 self._exception_check_sockets.remove(conn) 1047 1048 if not self._input_check_sockets: 1049 self._we_have_connections_for_select = False 1050 1051 if __debug__: self._log('CLIENT {} NORMALLY CLOSED.'.format(connection_id)) 1052 1053 def close_connection_by_conn(self, conn): 1054 # Если conn не в списке - вылетет ошибка. Это предотвратит ошибочное закрытие незарегистрированного сокета. 1055 # И мы сможем обнаружить наличие соответствующей ошибки в коде. 1056 if __debug__: self._log('CLOSE CLIENT BY CONN: {}'.format(repr(conn))) 1057 connection_id = self._connection_by_conn[conn] 1058 if __debug__: self._log('\t WITH CLIENT ID: {}'.format(connection_id)) 1059 self.close_connection(connection_id) 1060 1061 def remove_connection(self, connection_id): 1062 # client should NOT be removed immediately after connection close (close_connection): 1063 # code should do it by itself after reading all available input data 1064 if __debug__: self._log('REMOVE CLIENT: {}'.format(connection_id)) 1065 client_info = self._connections[connection_id] 1066 if __debug__: self._log('\tWITH KEYWORD: {}'.format(client_info.keyword)) 1067 if client_info.conn.existence: 1068 self.close_connection(connection_id) 1069 conn = client_info.conn.result 1070 if conn is None: 1071 return 1072 1073 if conn in self._conns_of_unexpected_clients: 1074 self._remove_unexpected_client(self._conns_of_unexpected_clients[conn]) 1075 1076 # if conn in self._conns_of_expected_clients: 1077 # self._remove_client(self._conns_of_expected_clients[conn]) 1078 if client_info.connected_expected_client_id is not None: 1079 self._remove_client(client_info.connected_expected_client_id) 1080 1081 client_info.connected_expected_client_id = None 1082 client_info.connected_expected_client = None 1083 1084 if connection_id in self._connections_marked_as_ready_to_be_deleted: 1085 self._connections_marked_as_ready_to_be_deleted.remove(connection_id) 1086 1087 client_info.conn.existence = False 1088 client_info.conn.result = None 1089 1090 del self._connections[connection_id] 1091 if conn in self._connection_by_conn: 1092 del self._connection_by_conn[conn] 1093 if conn in self._input_check_sockets: 1094 self._input_check_sockets.remove(conn) 1095 if conn in self._output_check_sockets: 1096 self._output_check_sockets.remove(conn) 1097 if conn in self._exception_check_sockets: 1098 self._exception_check_sockets.remove(conn) 1099 1100 # client_info.remove() 1101 1102 def remove_connection_by_conn(self, conn): 1103 connection_id = self._connection_by_conn[conn] 1104 self.remove_connection(connection_id) 1105 1106 def _log(self, log_string): 1107 self._internal_log.append(log_string) 1108 if self.echo_log: 1109 print(log_string) 1110 sys.stdout.flush() 1111 1112 def _create_unknown_client_from_connection(self, client_info: Connection): 1113 keyword = None 1114 keyword_is_ok = False 1115 while not keyword_is_ok: 1116 keyword = self.prefix_for_unknown_client_keywords + self._unknown_clients_keyword_gen().encode() 1117 if keyword not in self._keywords_for_expected_clients: 1118 keyword_is_ok = True 1119 connection_settings = ConnectionSettings(direction_role=ConnectionDirectionRole.client, 1120 socket_address=client_info.addr.result, keyword=keyword) 1121 expected_client_info = Client(connection_settings) 1122 expected_client_info.id = self._expected_clients_id_gen() 1123 1124 expected_client_info.connection_id = client_info.id 1125 expected_client_info._Client__connection = client_info 1126 expected_client_info.will_use_raw_client_connection = True 1127 expected_client_info.will_use_raw_connection_without_handshake = True 1128 expected_client_info.this_is_unknown_client = True 1129 if self.class_for_unknown_clients_inline_processing is not None: 1130 self._set_inline_processor_for_client(client_info, expected_client_info, 1131 self.class_for_unknown_clients_inline_processing) 1132 1133 self._conns_of_expected_clients[client_info.conn.result] = expected_client_info.id 1134 client_info.connected_expected_client_id = expected_client_info.id 1135 client_info.connected_expected_client = expected_client_info 1136 self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword] = expected_client_info.id 1137 self._expected_clients[expected_client_info.id] = expected_client_info 1138 1139 self._io_iteration_result.newly_connected_unknown_clients.add(expected_client_info.id) 1140 1141 return expected_client_info.id 1142 1143 def _set_inline_processor_for_client(self, connection_info: Connection, 1144 expected_client_info: Client, 1145 class_for_unknown_clients_inline_processing: type): 1146 assert type(class_for_unknown_clients_inline_processing) == type, \ 1147 '.class_for_unknown_clients_inline_processing must be a class or None' 1148 keyword = expected_client_info.connection_settings.keyword 1149 expected_client_info.obj_for_inline_processing = class_for_unknown_clients_inline_processing( 1150 expected_client_info.id, keyword, connection_info.conn.result.family, connection_info.conn.result.type, 1151 connection_info.conn.result.proto, copy.copy(connection_info.addr_info), 1152 copy.copy(connection_info.host_names), self._clients_with_inline_processors_that_need_to_apply_parameters 1153 ) 1154 connection_info.has_inline_processor = True 1155 self._inline_processor__init_parameters(connection_info, expected_client_info) 1156 1157 def _connect_to_super_server(self, expected_client_info: Client): 1158 """ 1159 Подключение происходит в блокируещем режиме (неблокирующий режим включается позже - в методе add_connection()). 1160 Для реализации неблокирующего режима надо оттестировать текущий код и ввести дополнительную неблокирующую 1161 логику через select/poll/epoll: 1162 http://man7.org/linux/man-pages/man2/connect.2.html 1163 :param expected_client_info: 1164 :return: 1165 """ 1166 connection_settings = expected_client_info.connection_settings 1167 conn = None 1168 try: 1169 conn = socket.socket(connection_settings.socket_family, connection_settings.socket_type, 1170 connection_settings.socket_protocol, connection_settings.socket_fileno) 1171 self.socket_by_fd[conn.fileno()] = conn 1172 # conn.setblocking(0) 1173 except (socket.error, OSError) as err: 1174 if __debug__: self._log('EXCEPTION: SUPER SERVER: CONNECT TO: CREATE SOCKET: {}, {}'.format( 1175 err.errno, err.strerror)) 1176 raise 1177 1178 try: 1179 conn.connect(connection_settings.socket_address) 1180 except (TimeoutError, socket.error, OSError) as err: 1181 # https://stackoverflow.com/questions/16772519/socket-recv-on-selected-socket-failing-with-etimedout 1182 del self.socket_by_fd[conn.fileno()] 1183 conn.close() 1184 if __debug__: self._log('EXCEPTION: SUPER SERVER: CONNECT TO: CONNECT:"{}", {}, {}'.format( 1185 connection_settings.socket_address, err.errno, err.strerror)) 1186 raise 1187 1188 super_server_client_id = self.add_connection(conn, connection_settings.socket_address) 1189 1190 addr_info = host_names = None 1191 try: 1192 if self.should_get_client_addr_info_on_connection and (conn.family in INET_TYPE_CONNECTIONS): 1193 addr_info = socket.getaddrinfo(connection_settings.socket_address[0], 1194 connection_settings.socket_address[1]) 1195 host_names = socket.gethostbyaddr(connection_settings.socket_address[0]) 1196 except ConnectionError as err: 1197 # An established connection was aborted by the software in your host machine 1198 if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(connection_settings.socket_address)) 1199 if __debug__: self._log('EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1200 connection_settings.socket_address, err.errno, err.strerror)) 1201 self._mark_connection_to_be_closed_immediately(super_server_client_id) 1202 ok = False 1203 except (socket.error, OSError) as err: 1204 if __debug__: self._log('EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1205 connection_settings.socket_address, err.errno, err.strerror)) 1206 if err.errno in SET_OF_CONNECTION_ERRORS: 1207 # An established connection was aborted by the software in your host machine 1208 if __debug__: self._log( 1209 'CLOSING {}: Connection reset by peer'.format(connection_settings.socket_address)) 1210 if __debug__: self._log( 1211 'EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1212 connection_settings.socket_address, err.errno, err.strerror)) 1213 self._mark_connection_to_be_closed_immediately(super_server_client_id) 1214 ok = False 1215 else: 1216 if 'nt' == os.name: 1217 if errno.WSAECONNRESET == err.errno: 1218 # An existing connection was forcibly closed by the remote host 1219 if __debug__: self._log( 1220 'CLOSING {}: Connection reset by peer'.format(connection_settings.socket_address)) 1221 if __debug__: self._log( 1222 'EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1223 connection_settings.socket_address, err.errno, err.strerror)) 1224 self._mark_connection_to_be_closed_immediately(super_server_client_id) 1225 ok = False 1226 else: 1227 raise err 1228 else: 1229 raise err 1230 1231 super_server_client_info = self._connections[super_server_client_id] 1232 super_server_client_info.addr_info = addr_info 1233 super_server_client_info.host_names = host_names 1234 self._log_new_connection(super_server_client_info, False) 1235 1236 if expected_client_info.will_use_raw_connection_without_handshake: 1237 # Connection is made without handshake 1238 super_server_client_info.this_is_raw_connection = True 1239 self._unconfirmed_clients.remove(super_server_client_info.conn.result) 1240 self._io_iteration_result.newly_connected_expected_clients.add(expected_client_info.id) 1241 else: 1242 keyword = expected_client_info.connection_settings.keyword 1243 if keyword is None: 1244 keyword = self._get_own_keyword_appropriate_for_connection(conn) 1245 if keyword is None: 1246 # Если ключевое слово не предоставлено - клиент будет помечен для закрытия и удаления 1247 self._mark_connection_to_be_closed_immediately(super_server_client_info) 1248 self._mark_connection_as_ready_for_deletion(super_server_client_info) 1249 raise Exception('Own keyword should be provided in connection_settings!') 1250 if keyword: 1251 # Если ключевое слово предоставлено (даже если оно путое типа b'' - оно будет отправлено супер-серверу). 1252 self._send_message_through_connection(super_server_client_info, keyword) 1253 1254 expected_client_info.connection_id = super_server_client_id 1255 expected_client_info._Client__connection = super_server_client_info 1256 self._conns_of_expected_clients[conn] = expected_client_info.id 1257 super_server_client_info.connected_expected_client_id = expected_client_info.id 1258 super_server_client_info.connected_expected_client = expected_client_info 1259 1260 def _get_own_keyword_appropriate_for_connection(self, conn): 1261 keyword = None 1262 random_keyword = None 1263 for gate_connection_settings in self.gates_connections_settings: 1264 random_keyword = gate_connection_settings.keyword 1265 if (conn.family == gate_connection_settings.socket_family) and ( 1266 conn.type == gate_connection_settings.socket_type) and \ 1267 (conn.proto == gate_connection_settings.socket_protocol): 1268 keyword = gate_connection_settings.keyword 1269 if keyword is None: 1270 keyword = random_keyword 1271 return keyword 1272 1273 def _pack_message(self, data): 1274 return len(data).to_bytes(self.message_size_len, 'little') + data 1275 1276 def _remove_client(self, expected_client_id): 1277 expected_client_info = self._expected_clients[expected_client_id] 1278 del self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword] 1279 del self._expected_clients[expected_client_id] 1280 connection_info = expected_client_info._Client__connection 1281 if connection_info is not None: 1282 # you can remove expected client before it will be connected to the server 1283 del self._conns_of_expected_clients[connection_info.conn.result] 1284 expected_client_info.connection_id = None 1285 expected_client_info._Client__connection = None 1286 1287 def _add_unexpected_client(self, connection_id, keyword): 1288 connection_settings = ConnectionSettings(direction_role=ConnectionDirectionRole.client, keyword=keyword) 1289 unexpected_client_info = Client(connection_settings, self._unexpected_clients_id_gen(), 1290 connection_id) 1291 1292 self._unexpected_clients[unexpected_client_info.id] = unexpected_client_info 1293 self._keywords_of_unexpected_clients[keyword] = unexpected_client_info.id 1294 connection_info = unexpected_client_info._Client__connection 1295 self._conns_of_unexpected_clients[connection_info.conn.result] = unexpected_client_info.id 1296 1297 return unexpected_client_info.id 1298 1299 def _remove_unexpected_client(self, unexpected_client_id): 1300 unexpected_client_info = self._unexpected_clients[unexpected_client_id] 1301 connection_info = unexpected_client_info._Client__connection # unexpected client appears only after 1302 # connection 1303 del self._keywords_of_unexpected_clients[unexpected_client_info.connection_settings.keyword] 1304 del self._unexpected_clients[unexpected_client_id] 1305 del self._conns_of_unexpected_clients[connection_info.conn.result] 1306 unexpected_client_info.connection_id = None 1307 unexpected_client_info._Client__connection = None 1308 1309 # @profile 1310 def _accept_new_connection(self, readable_socket: socket.socket): 1311 # One of a "readable" self._gate sockets is ready to accept a connection 1312 ok = True 1313 while ok: 1314 connection_id = None 1315 client_info = None 1316 1317 connection = None 1318 client_address = None 1319 try: 1320 connection, client_address = readable_socket.accept() 1321 self.socket_by_fd[connection.fileno()] = connection 1322 connection_id = self.add_connection(connection, client_address) 1323 client_info = self._connections[connection_id] 1324 except BlockingIOError as err: 1325 ok = False 1326 except InterruptedError as err: 1327 pass 1328 except (socket.error, OSError) as err: 1329 if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno): 1330 ok = False 1331 elif errno.EINTR == err.errno: 1332 pass 1333 elif errno.ECONNABORTED == err.errno: 1334 ok = False 1335 elif errno.EMFILE == err.errno: 1336 if __debug__: self._log( 1337 'The per-process limit on the number of open file descriptors had been reached.') 1338 ok = False 1339 elif errno.ENFILE == err.errno: 1340 if __debug__: self._log( 1341 'The system-wide limit on the total number of open files had been reached.') 1342 ok = False 1343 elif (errno.ENOBUFS == err.errno) or (errno.ENOMEM == err.errno): 1344 if __debug__: self._log( 1345 'Not enough free memory. Allocation is limited by the socket buffer limits.') 1346 ok = False 1347 elif errno.EPROTO == err.errno: 1348 if __debug__: self._log('Protocol error.') 1349 ok = False 1350 elif errno.EPERM == err.errno: 1351 if __debug__: self._log('Firewall rules forbid connection.') 1352 ok = False 1353 else: 1354 raise err 1355 1356 if (connection is not None) and (client_info is not None): 1357 addr_info = host_names = None 1358 try: 1359 if self.should_get_client_addr_info_on_connection and \ 1360 (connection.family in INET_TYPE_CONNECTIONS): 1361 addr_info = socket.getaddrinfo(client_address[0], client_address[1]) 1362 host_names = socket.gethostbyaddr(client_address[0]) 1363 except ConnectionError as err: 1364 # An established connection was aborted by the software in your host machine 1365 if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(client_address)) 1366 if __debug__: self._log('EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1367 client_address, err.errno, err.strerror)) 1368 self._mark_connection_to_be_closed_immediately(client_info) 1369 ok = False 1370 except (socket.error, OSError) as err: 1371 if __debug__: self._log('EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1372 client_address, err.errno, err.strerror)) 1373 if err.errno in SET_OF_CONNECTION_ERRORS: 1374 # An established connection was aborted by the software in your host machine 1375 if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(client_address)) 1376 if __debug__: self._log('EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1377 client_address, err.errno, err.strerror)) 1378 self._mark_connection_to_be_closed_immediately(client_info) 1379 ok = False 1380 else: 1381 if 'nt' == os.name: 1382 if errno.WSAECONNRESET == err.errno: 1383 # An existing connection was forcibly closed by the remote host 1384 if __debug__: self._log( 1385 'CLOSING {}: Connection reset by peer'.format(client_address)) 1386 if __debug__: self._log( 1387 'EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1388 client_address, err.errno, err.strerror)) 1389 self._mark_connection_to_be_closed_immediately(client_info) 1390 ok = False 1391 else: 1392 raise err 1393 else: 1394 raise err 1395 1396 client_info.addr_info = addr_info 1397 client_info.host_names = host_names 1398 self._log_new_connection(client_info, True) 1399 1400 if self.need_to_auto_check_incoming_raw_connection \ 1401 and self.raw_checker_for_new_incoming_connections(self, client_info): 1402 # Is should be RAW: 1403 if self.unknown_clients_are_allowed: 1404 # Unknown clients are allowed - create Unknown Expected Client for this connection 1405 client_info.this_is_raw_connection = True 1406 self._unconfirmed_clients.remove(client_info.conn.result) 1407 self._create_unknown_client_from_connection(client_info) 1408 else: 1409 # Unknown clients are Non allowed - close connection. 1410 if __debug__: self._log( 1411 'UNKNOWN CLIENT {} WILL BE CLOSED: UNKNOWN CLIENTS ARE NOT ALLOWED'.format( 1412 client_info.addr.result 1413 )) 1414 self._mark_connection_to_be_closed_immediately(client_info) 1415 1416 # @profile 1417 def _read_data_from_already_connected_socket__inner__memory_optimized(self, curr_client_info: Connection, 1418 possible_messages_in_client_input_fifo=False): 1419 ok = True 1420 1421 data = curr_client_info.conn.result.recv(curr_client_info.recv_buff_size) 1422 data_len = len(data) 1423 curr_client_info.calc_new_recv_buff_size(data_len) 1424 if data: 1425 data = memoryview(data) 1426 if curr_client_info.this_is_raw_connection: 1427 curr_client_info.input_from_client.put(data) 1428 possible_messages_in_client_input_fifo = True 1429 else: 1430 curr_client_info.raw_input_from_client.add_piece_of_data(data) 1431 possible_messages_in_client_input_fifo = self._read_messages_from_raw_input_into_fifo( 1432 curr_client_info) 1433 else: 1434 # Interpret empty result as closed connection 1435 if __debug__: self._log( 1436 'CLOSING {} after reading no data:'.format(curr_client_info.addr.result)) 1437 self._mark_connection_to_be_closed_immediately(curr_client_info) 1438 ok = False 1439 1440 result = (ok, possible_messages_in_client_input_fifo) 1441 return result 1442 1443 # @profile 1444 def _read_data_from_already_connected_socket__inner__speed_optimized(self, curr_client_info: Connection, 1445 possible_messages_in_client_input_fifo=False): 1446 ok = True 1447 1448 if curr_client_info.current_memoryview_input: 1449 nbytes = 0 1450 try: 1451 nbytes = curr_client_info.conn.result.recv_into(curr_client_info.current_memoryview_input) 1452 except TimeoutError: 1453 # https://stackoverflow.com/questions/16772519/socket-recv-on-selected-socket-failing-with-etimedout 1454 pass 1455 1456 if nbytes > 0: 1457 data = curr_client_info.current_memoryview_input[:nbytes] 1458 curr_client_info.current_memoryview_input = curr_client_info.current_memoryview_input[nbytes:] 1459 1460 if curr_client_info.this_is_raw_connection: 1461 curr_client_info.input_from_client.put(data) 1462 possible_messages_in_client_input_fifo = True 1463 else: 1464 curr_client_info.raw_input_from_client.add_piece_of_data(data) 1465 possible_messages_in_client_input_fifo = self._read_messages_from_raw_input_into_fifo( 1466 curr_client_info) 1467 else: 1468 # Interpret empty result as closed connection 1469 if __debug__: self._log( 1470 'CLOSING {} after reading no data:'.format(curr_client_info.addr.result)) 1471 self._mark_connection_to_be_closed_immediately(curr_client_info) 1472 ok = False 1473 else: 1474 input_buffer = bytearray(self.socket_read_fixed_buffer_size.result) 1475 curr_client_info.current_memoryview_input = memoryview(input_buffer) 1476 1477 result = (ok, possible_messages_in_client_input_fifo) 1478 return result 1479 1480 # @profile 1481 def _read_data_from_already_connected_socket__shell(self, readable_socket: socket.socket): 1482 possible_messages_in_client_input_fifo = False 1483 1484 curr_client_id = self._connection_by_conn[readable_socket] 1485 curr_client_info = self._connections[curr_client_id] 1486 1487 ok = True 1488 while ok: 1489 try: 1490 if self.use_speed_optimized_socket_read: 1491 ok, possible_messages_in_client_input_fifo = \ 1492 self._read_data_from_already_connected_socket__inner__speed_optimized( 1493 curr_client_info, possible_messages_in_client_input_fifo) 1494 else: 1495 ok, possible_messages_in_client_input_fifo = \ 1496 self._read_data_from_already_connected_socket__inner__memory_optimized( 1497 curr_client_info, possible_messages_in_client_input_fifo) 1498 break # makes IO faster on 10-30% on all modes (message/raw, data/http, 1499 # static_read_buffer/non_static_read_buffer). 1500 # Ускорение происходит за счет того, что: 1) данных оказывается не настолько много чтобы кеш процессора 1501 # переполнялся и требовалась бы работа с оперативной памятью; 2) при таком разбиении ввод и вывод 1502 # начинают происходить (на уровне ОС) одновременно. Т.е. мы взяли из входного буфера порцию данных и 1503 # пустили в обработку. В это время ввод на уровне ОС продолжается: ОС заполняет опустевшие буферы. 1504 # После обработки мы пускаем данные на отправку и забираем очередную порцию данных из входного буфера. 1505 # В это время происходит отправка данных из буферов на уровне ОС. И т.д. 1506 except BlockingIOError as err: 1507 ok = False 1508 except InterruptedError as err: 1509 pass 1510 except ConnectionError as err: 1511 # An established connection was aborted by the software in your host machine 1512 if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result)) 1513 if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1514 curr_client_info.addr.result, err.errno, err.strerror)) 1515 self._mark_connection_to_be_closed_immediately(curr_client_info) 1516 ok = False 1517 except (socket.error, OSError) as err: 1518 if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno): 1519 ok = False 1520 elif errno.EINTR == err.errno: 1521 pass 1522 elif err.errno in SET_OF_CONNECTION_ERRORS: 1523 # An established connection was aborted by the software in your host machine 1524 if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result)) 1525 if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1526 curr_client_info.addr.result, err.errno, err.strerror)) 1527 self._mark_connection_to_be_closed_immediately(curr_client_info) 1528 ok = False 1529 else: 1530 if 'nt' == os.name: 1531 if errno.WSAECONNRESET == err.errno: 1532 # An existing connection was forcibly closed by the remote host 1533 if __debug__: self._log( 1534 'CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result)) 1535 if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1536 curr_client_info.addr.result, err.errno, err.strerror)) 1537 self._mark_connection_to_be_closed_immediately(curr_client_info) 1538 ok = False 1539 elif errno.EHOSTUNREACH == err.errno: 1540 # OSError: [Errno 113] No route to host 1541 if __debug__: self._log( 1542 'CLOSING {}: No route to host'.format(curr_client_info.addr.result)) 1543 if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1544 curr_client_info.addr.result, err.errno, err.strerror)) 1545 self._mark_connection_to_be_closed_immediately(curr_client_info) 1546 ok = False 1547 else: 1548 if __debug__: self._log( 1549 'CLOSING {}: Unknown reason'.format(curr_client_info.addr.result)) 1550 if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1551 curr_client_info.addr.result, err.errno, err.strerror)) 1552 raise err 1553 else: 1554 if errno.WSAEHOSTUNREACH == err.errno: 1555 # OSError: [Errno 113] No route to host 1556 if __debug__: self._log( 1557 'CLOSING {}: No route to host'.format(curr_client_info.addr.result)) 1558 if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1559 curr_client_info.addr.result, err.errno, err.strerror)) 1560 self._mark_connection_to_be_closed_immediately(curr_client_info) 1561 ok = False 1562 else: 1563 if __debug__: self._log( 1564 'CLOSING {}: Unknown reason'.format(curr_client_info.addr.result)) 1565 if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1566 curr_client_info.addr.result, err.errno, err.strerror)) 1567 self._mark_connection_to_be_closed_immediately(curr_client_info) 1568 ok = False 1569 # raise err 1570 1571 read_is_forbidden = False 1572 if (self.global_in__data_full_size.result - self.global_in__deletable_data_full_size.result) \ 1573 > self.global_in__data_size_limit.result: 1574 read_is_forbidden = True 1575 ok = False 1576 1577 read_is_forbidden_test = self.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger( 1578 read_is_forbidden) 1579 if read_is_forbidden_test is not None: 1580 if read_is_forbidden_test: 1581 print('Read is suppressed until data will be processed.') 1582 else: 1583 print('Read is allowed: data is processed.') 1584 1585 return possible_messages_in_client_input_fifo 1586 1587 # @profile 1588 def _read_data_from_socket(self, readable_socket: socket.socket): 1589 possible_messages_in_client_input_fifo = False 1590 if readable_socket in self._gate: 1591 accept_is_forbidden = True 1592 if (self.global_in__data_full_size.result + self.global_out__data_full_size.result) \ 1593 <= self.global__data_size_limit.result: 1594 accept_is_forbidden = False 1595 self._accept_new_connection(readable_socket) 1596 1597 if __debug__: 1598 accept_is_forbidden_test = \ 1599 self.show_inform_about_accept_stop_because_of_all_buffers_size_limit.test_trigger( 1600 accept_is_forbidden) 1601 if accept_is_forbidden_test is not None: 1602 if accept_is_forbidden_test: 1603 print('Accept is suppressed until data will be processed and/or out.') 1604 else: 1605 print('Accept is allowed: data is processed and/or out.') 1606 else: 1607 possible_messages_in_client_input_fifo = self._read_data_from_already_connected_socket__shell( 1608 readable_socket) 1609 1610 return possible_messages_in_client_input_fifo 1611 1612 def _log_new_connection(self, client_info: Connection, is_incoming_connection): 1613 connection = client_info.conn.result 1614 client_address = client_info.addr.result 1615 addr_info = client_info.addr_info 1616 host_names = client_info.host_names 1617 1618 connection_type_string = 'OUTGOING' 1619 from_to = 'to' 1620 if is_incoming_connection: 1621 connection_type_string = 'INCOMING' 1622 from_to = 'from' 1623 1624 if __debug__: self._log('New {} connection {} {}'.format(connection_type_string, from_to, client_address)) 1625 if connection.family in {socket.AF_INET, socket.AF_INET6}: 1626 if __debug__: self._log('\taddr_info: {}'.format(addr_info)) 1627 if __debug__: self._log('\thost_names: {}'.format(host_names)) 1628 1629 # @profile 1630 def _write_data_to_socket(self, curr_client_info: Connection): 1631 # CAUTION: code here is optimized for speed - not for readability or beauty. 1632 1633 expected_client_info = curr_client_info.connected_expected_client 1634 writable_socket = curr_client_info.conn.result 1635 if curr_client_info.should_be_closed: 1636 curr_client_info.current_memoryview_output = None 1637 curr_client_info.output_to_client = copy.copy(curr_client_info.output_to_client) 1638 return 1639 1640 ok = True 1641 first_pass = True 1642 can_call__inline_processor__on__output_buffers_are_empty = True 1643 while ok: 1644 try: 1645 if curr_client_info.current_memoryview_output: 1646 nsent = writable_socket.send(curr_client_info.current_memoryview_output) 1647 curr_client_info.current_memoryview_output = curr_client_info.current_memoryview_output[nsent:] 1648 else: 1649 curr_client_info.current_memoryview_output = None 1650 output_fifo_size = curr_client_info.output_to_client.size() 1651 if output_fifo_size > 1: 1652 result_data, result_size, result_qnt = \ 1653 curr_client_info.output_to_client.get_at_least_size(524288) 1654 if result_qnt > 1: 1655 curr_client_info.current_memoryview_output = memoryview(b''.join(result_data)) 1656 else: 1657 curr_client_info.current_memoryview_output = memoryview(result_data.popleft()) 1658 elif output_fifo_size == 1: 1659 curr_client_info.current_memoryview_output = memoryview(curr_client_info.output_to_client.get()) 1660 1661 if curr_client_info.current_memoryview_output is None: 1662 # if curr_client_info.ready_to_be_closed: 1663 # if first_pass: 1664 # # Т.е. если данных на отправку небыло даже при первом проходе цикла - т.е. изначально. 1665 # # Это значит что все данные были отправлены, и можно закрывать соединение. 1666 # self._output_check_sockets.remove(writable_socket) 1667 # self._mark_connection_to_be_closed_immediately(curr_client_info) 1668 # # Если соединение помечено как "Готово к закрытию" - то нам надо дождаться момента когда 1669 # # данные будут отправлены, и только в этот момент закрывать соединение. Поэтому надо 1670 # # сохранить сокет в списке проверяемых для отправки. 1671 # else: 1672 # self._output_check_sockets.remove(writable_socket) 1673 if not curr_client_info.ready_to_be_closed: 1674 # Если соединение помечено как "Готово к закрытию" - то нам надо дождаться момента когда 1675 # данные будут отправлены, и только в этот момент закрывать соединение. Поэтому надо 1676 # сохранить сокет в списке проверяемых для отправки. 1677 self._output_check_sockets.remove(writable_socket) 1678 if first_pass and curr_client_info.ready_to_be_closed: 1679 # Т.е. если данных на отправку небыло даже при первом проходе цикла - т.е. изначально. 1680 # Это значит что все данные были отправлены, и можно закрывать соединение. 1681 self._output_check_sockets.remove(writable_socket) 1682 self._mark_connection_to_be_closed_immediately(curr_client_info) 1683 ok = False 1684 if expected_client_info: 1685 self._io_iteration_result.clients_with_empty_output_fifo.add( 1686 curr_client_info.connected_expected_client_id) 1687 if curr_client_info.has_inline_processor: 1688 if can_call__inline_processor__on__output_buffers_are_empty: 1689 if self._inline_processor__on__output_buffers_are_empty(curr_client_info, 1690 expected_client_info): 1691 ok = True 1692 can_call__inline_processor__on__output_buffers_are_empty = False 1693 except BlockingIOError as err: 1694 ok = False 1695 except InterruptedError as err: 1696 pass 1697 except ConnectionError as err: 1698 # An established connection was aborted by the software in your host machine 1699 if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result)) 1700 if __debug__: self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format( 1701 curr_client_info.addr.result, err.errno, err.strerror)) 1702 self._mark_connection_to_be_closed_immediately(curr_client_info) 1703 ok = False 1704 except (socket.error, OSError) as err: 1705 if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno): 1706 ok = False 1707 elif errno.EINTR == err.errno: 1708 pass 1709 elif err.errno in SET_OF_CONNECTION_ERRORS: 1710 # Connection reset by peer 1711 if __debug__: self._log( 1712 'CLOSING {}: Connection reset by peer ({})'.format(curr_client_info.addr.result, 1713 err.strerror)) 1714 if __debug__: self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format( 1715 curr_client_info.addr.result, err.errno, err.strerror)) 1716 self._mark_connection_to_be_closed_immediately(curr_client_info) 1717 ok = False 1718 else: 1719 if 'nt' == os.name: 1720 if errno.WSAECONNRESET == err.errno: 1721 # An existing connection was forcibly closed by the remote host 1722 if __debug__: self._log( 1723 'CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result)) 1724 if __debug__: self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format( 1725 curr_client_info.addr.result, err.errno, err.strerror)) 1726 self._mark_connection_to_be_closed_immediately(curr_client_info) 1727 ok = False 1728 else: 1729 raise err 1730 else: 1731 raise err 1732 first_pass = False 1733 1734 def _mark_connection_to_be_closed_immediately(self, client_info: Connection): 1735 client_info.should_be_closed = True 1736 client_info.current_memoryview_input = None 1737 self._connections_marked_to_be_closed_immediately.add(client_info.conn.result) 1738 if client_info.connected_expected_client_id is not None: 1739 self._io_iteration_result.clients_with_disconnected_connection.add( 1740 client_info.connected_expected_client_id) 1741 1742 def _mark_connection_as_ready_to_be_closed(self, client_info: Connection): 1743 client_info.ready_to_be_closed = True 1744 1745 def _mark_connection_as_ready_for_deletion(self, client_info: Connection): 1746 client_info.ready_for_deletion = True 1747 self._connections_marked_as_ready_to_be_deleted.add(client_info.conn.result) 1748 1749 def _handle_connection_error(self, writable_socket: socket.socket): 1750 # Data read from already connected client 1751 curr_client_id = self._connection_by_conn[writable_socket] 1752 curr_client_info = self._connections[curr_client_id] 1753 if curr_client_info.should_be_closed: 1754 return 1755 if __debug__: self._log('handling exceptional condition for {}'.format(curr_client_info.addr.result)) 1756 self._mark_connection_to_be_closed_immediately(curr_client_info) 1757 1758 # @profile 1759 def _read_messages_from_raw_input_into_fifo(self, curr_client_info: Connection): 1760 result = False 1761 1762 try: 1763 while True: 1764 if curr_client_info.current_message_length is None: 1765 current_message_length = curr_client_info.raw_input_from_client.get_data(self.message_size_len) 1766 if current_message_length is None: 1767 break 1768 1769 curr_client_info.current_message_length = int.from_bytes(current_message_length, 'little') 1770 1771 current_message = curr_client_info.raw_input_from_client.get_data( 1772 curr_client_info.current_message_length) 1773 if current_message is None: 1774 break 1775 else: 1776 curr_client_info.input_from_client.put(current_message) 1777 curr_client_info.current_message_length = None 1778 result = True 1779 except TypeError: 1780 pass 1781 1782 return result 1783 1784 def _send_message_through_connection(self, client_info: Connection, data): 1785 if client_info.conn.existence: 1786 client_info.output_to_client.put(len(data).to_bytes(self.message_size_len, 'little')) 1787 client_info.output_to_client.put(data) 1788 1789 self._output_check_sockets.add(client_info.conn.result) 1790 else: 1791 if __debug__: self._log('ERROR: SEND MESSAGE TO CLIENT {}: "{}"'.format(client_info.addr.result, data)) 1792 raise Exception('EXCEPTION: SEND MESSAGE TO CLIENT: Client is disconnected! You can not send data to him!') 1793 1794 def _generate_list_of_messages_with_their_length(self, messages_list): 1795 for message in messages_list: 1796 yield len(message).to_bytes(self.message_size_len, 'little') 1797 yield message 1798 1799 def _send_messages_through_connection(self, client_info: Connection, messages_list): 1800 if client_info.conn.existence: 1801 client_info.output_to_client.extend(self._generate_list_of_messages_with_their_length(messages_list)) 1802 1803 self._output_check_sockets.add(client_info.conn.result) 1804 else: 1805 if __debug__: self._log( 1806 'ERROR: SEND MESSAGES TO CLIENT {}: "{}"'.format(client_info.addr.result, messages_list)) 1807 raise Exception('EXCEPTION: SEND MESSAGES TO CLIENT: Client is disconnected! You can not send data to him!') 1808 1809 def _send_message_through_connection_raw(self, client_info: Connection, data): 1810 if client_info.conn.existence: 1811 client_info.output_to_client.put(data) 1812 self._output_check_sockets.add(client_info.conn.result) 1813 else: 1814 if __debug__: self._log('ERROR: SEND MESSAGE TO CLIENT {}: "{}"'.format(client_info.addr.result, data)) 1815 raise Exception( 1816 'EXCEPTION: SEND MESSAGE TO CLIENT RAW: Client is disconnected! You can not send data to him!') 1817 1818 def _send_messages_through_connection_raw(self, client_info: Connection, messages_list): 1819 if client_info.conn.existence: 1820 client_info.output_to_client.extend(messages_list) 1821 self._output_check_sockets.add(client_info.conn.result) 1822 else: 1823 if __debug__: self._log( 1824 'ERROR: SEND MESSAGES TO CLIENT {}: "{}"'.format(client_info.addr.result, messages_list)) 1825 raise Exception( 1826 'EXCEPTION: SEND MESSAGES TO CLIENT RAW: Client is disconnected! You can not send data to him!') 1827 1828 def _move_message_from_fifo_to_memoryview(self, client_info: Connection): 1829 if client_info.current_memoryview_output is None: 1830 if client_info.output_to_client.size(): 1831 client_info.current_memoryview_output = memoryview(client_info.output_to_client.get()) 1832 1833 # @profile 1834 def _consolidate_and_move_messages_from_fifo_to_memoryview(self, client_info: Connection): 1835 output_fifo_size = client_info.output_to_client.size() 1836 if output_fifo_size > 1: 1837 result_data, result_size, result_qnt = \ 1838 client_info.output_to_client.get_at_least_size(524288) 1839 if result_qnt > 1: 1840 client_info.current_memoryview_output = memoryview(b''.join(result_data)) 1841 else: 1842 client_info.current_memoryview_output = memoryview(result_data.popleft()) 1843 elif output_fifo_size == 1: 1844 client_info.current_memoryview_output = memoryview(client_info.output_to_client.get()) 1845 1846 def _process_client_keyword(self, client_socket: socket.socket): 1847 curr_client_id = self._connection_by_conn[client_socket] 1848 curr_client_info = self._connections[curr_client_id] 1849 1850 if curr_client_info.input_from_client.size() >= 0: 1851 expected_client_id = None 1852 expected_client_info = None 1853 1854 this_is_super_server_client = False 1855 if curr_client_info.connected_expected_client is not None: 1856 expected_client_info = curr_client_info.connected_expected_client 1857 expected_client_id = expected_client_info.id 1858 if ConnectionDirectionRole.server == expected_client_info.connection_settings.direction_role: 1859 this_is_super_server_client = True 1860 1861 if this_is_super_server_client: 1862 # This is connection to Super-Server. So we expect an answer like b'OK' 1863 super_server_answer__keyword_accepted = curr_client_info.input_from_client.get() 1864 super_server_answer__keyword_accepted = bytes(super_server_answer__keyword_accepted) 1865 if super_server_answer__keyword_accepted == self.server_answer__keyword_accepted: 1866 # Answer was acceptable 1867 self._unconfirmed_clients.remove(client_socket) 1868 self._io_iteration_result.newly_connected_expected_clients.add(expected_client_id) 1869 if expected_client_info.will_use_raw_client_connection: 1870 curr_client_info.this_is_raw_connection = True 1871 else: 1872 # Answer was NOT acceptable 1873 self._mark_connection_to_be_closed_immediately(curr_client_info) 1874 self._mark_connection_as_ready_for_deletion(curr_client_info) 1875 if __debug__: self._log('ERROR: SUPER SERVER ANSWER - KEYWORD WAS NOT ACCEPTED: {}'.format( 1876 super_server_answer__keyword_accepted)) 1877 else: 1878 # This is connection to client. So we expect a keyword 1879 keyword = curr_client_info.input_from_client.get() 1880 keyword = bytes(keyword) 1881 curr_client_info.keyword = keyword 1882 self._unconfirmed_clients.remove(client_socket) 1883 self._send_message_through_connection(curr_client_info, self.server_answer__keyword_accepted) 1884 1885 if keyword in self._keywords_for_expected_clients: 1886 # empty expected client was already registered 1887 expected_client_id = self._keywords_for_expected_clients[keyword] 1888 expected_client_info = self._expected_clients[expected_client_id] 1889 expected_client_info.connection_id = curr_client_id 1890 expected_client_info._Client__connection = curr_client_info 1891 self._conns_of_expected_clients[client_socket] = expected_client_id 1892 curr_client_info.connected_expected_client_id = expected_client_id 1893 curr_client_info.connected_expected_client = expected_client_info 1894 self._io_iteration_result.newly_connected_expected_clients.add(expected_client_id) 1895 if expected_client_info.will_use_raw_client_connection: 1896 curr_client_info.this_is_raw_connection = True 1897 else: 1898 # it is unknown expected client 1899 if self.unexpected_clients_are_allowed: 1900 self._add_unexpected_client(curr_client_id, keyword) 1901 else: 1902 self._mark_connection_to_be_closed_immediately(curr_client_info) 1903 self._mark_connection_as_ready_for_deletion(curr_client_info) 1904 1905 def _check_is_client_have_data_to_read_in_fifo(self, readable_socket: socket.socket): 1906 client_info = self._connections[self._connection_by_conn[readable_socket]] 1907 if client_info.connected_expected_client_id is not None: 1908 if client_info.input_from_client.size(): 1909 self._io_iteration_result.clients_have_data_to_read.add( 1910 client_info.connected_expected_client_id) 1911 if client_info.has_inline_processor: 1912 self._inline_processor__on__data_received(client_info) 1913 1914 def _client_have_data_to_read_in_fifo(self, readable_socket: socket.socket): 1915 if readable_socket in self._conns_of_expected_clients: 1916 expected_client_id = self._conns_of_expected_clients[readable_socket] 1917 expected_client = self._expected_clients[expected_client_id] 1918 self._io_iteration_result.clients_have_data_to_read.add(expected_client_id) 1919 client_info = expected_client._Client__connection 1920 if client_info.has_inline_processor: 1921 self._inline_processor__on__data_received(client_info) 1922 1923 def _inline_processor__apply_parameters(self, connection_info: Connection, 1924 expected_client: Client): 1925 inline_processor = expected_client.obj_for_inline_processing 1926 1927 inline_processor.is_in_raw_mode = inline_processor._InlineProcessor__set__is_in_raw_mode 1928 connection_info.this_is_raw_connection = inline_processor._InlineProcessor__set__is_in_raw_mode 1929 1930 if inline_processor._InlineProcessor__set__mark_socket_as_should_be_closed_immediately: 1931 inline_processor._InlineProcessor__set__mark_socket_as_should_be_closed_immediately = False 1932 self._mark_connection_to_be_closed_immediately(connection_info) 1933 1934 if inline_processor._InlineProcessor__set__mark_socket_as_ready_to_be_closed: 1935 inline_processor._InlineProcessor__set__mark_socket_as_ready_to_be_closed = False 1936 self._mark_connection_as_ready_to_be_closed(connection_info) 1937 1938 def _inline_processor__init_parameters(self, connection_info: Connection, 1939 expected_client: Client): 1940 inline_processor = expected_client.obj_for_inline_processing 1941 1942 inline_processor.is_in_raw_mode = connection_info.this_is_raw_connection 1943 inline_processor._InlineProcessor__set__is_in_raw_mode = connection_info.this_is_raw_connection 1944 1945 def _inline_processor__on__data_received(self, connection_info: Connection): 1946 expected_client = connection_info.connected_expected_client 1947 inline_processor = expected_client.obj_for_inline_processing 1948 1949 try: 1950 while connection_info.input_from_client.size(): 1951 inline_processor.on__data_received(connection_info.input_from_client.get()) 1952 1953 if inline_processor.output_messages: 1954 while inline_processor.output_messages: 1955 another_message = inline_processor.output_messages.popleft() 1956 if not connection_info.this_is_raw_connection: 1957 connection_info.output_to_client.put( 1958 len(another_message).to_bytes(self.message_size_len, 'little')) 1959 connection_info.output_to_client.put(another_message) 1960 self._output_check_sockets.add(connection_info.conn.result) 1961 if connection_info.output_to_client.get_data_full_size() >= 65536: 1962 self._write_data_to_socket(connection_info) 1963 return True 1964 except: 1965 self.remove_client(expected_client.id) 1966 exc = sys.exc_info() 1967 exception = exc 1968 error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0])) 1969 formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2]) 1970 exception = exception[:2] + (formatted_traceback,) 1971 trace_str = ''.join(exception[2]) 1972 result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str) 1973 if __debug__: self._log('EXCEPTION: INLINE PROCESSOR: ON DATA RECEIVED: {}'.format(result_string)) 1974 1975 return False 1976 1977 def _inline_processor__on__output_buffers_are_empty(self, connection_info: Connection, 1978 expected_client: Client): 1979 inline_processor = expected_client.obj_for_inline_processing 1980 1981 if not connection_info.has_inline_processor: 1982 return False 1983 1984 try: 1985 inline_processor.on__output_buffers_are_empty() 1986 1987 if inline_processor.output_messages: 1988 while inline_processor.output_messages: 1989 another_message = inline_processor.output_messages.popleft() 1990 if not connection_info.this_is_raw_connection: 1991 connection_info.output_to_client.put( 1992 len(another_message).to_bytes(self.message_size_len, 'little')) 1993 connection_info.output_to_client.put(another_message) 1994 self._output_check_sockets.add(connection_info.conn.result) 1995 if connection_info.output_to_client.get_data_full_size() >= 65536: 1996 # self._write_data_to_socket(connection_info.conn.result) 1997 self._write_data_to_socket(connection_info) 1998 return True 1999 except: 2000 self.remove_client(expected_client.id) 2001 exc = sys.exc_info() 2002 exception = exc 2003 error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0])) 2004 formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2]) 2005 exception = exception[:2] + (formatted_traceback,) 2006 trace_str = ''.join(exception[2]) 2007 result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str) 2008 if __debug__: self._log( 2009 'EXCEPTION: INLINE PROCESSOR: ON OUTPUT BUFFERS ARE EMPTY: {}'.format(result_string)) 2010 2011 return False 2012 2013 def _inline_processor__on__connection_lost(self, connection_info: Connection, 2014 expected_client: Client): 2015 inline_processor = expected_client.obj_for_inline_processing 2016 2017 if not connection_info.has_inline_processor: 2018 return False 2019 2020 try: 2021 inline_processor.on__connection_lost() 2022 except: 2023 exc = sys.exc_info() 2024 exception = exc 2025 error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0])) 2026 formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2]) 2027 exception = exception[:2] + (formatted_traceback,) 2028 trace_str = ''.join(exception[2]) 2029 result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str) 2030 if __debug__: self._log('EXCEPTION: INLINE PROCESSOR: ON CONNECTION LOST: {}'.format(result_string)) 2031 self.remove_client(expected_client.id) 2032 2033 def _inline_processor__on__connection_lost_by_connection_id(self, connection_id): 2034 connection_info = self._connections[connection_id] 2035 expected_client_info = connection_info.connected_expected_client 2036 if (expected_client_info is not None) and connection_info.has_inline_processor: 2037 self._inline_processor__on__connection_lost(connection_info, expected_client_info) 2038 self._io_iteration_result.clients_with_disconnected_connection.remove( 2039 expected_client_info.id) 2040 2041 def _unlink_good_af_unix_sockets(self): 2042 if 'posix' == os.name: 2043 for gate_connection_settings in self.gates_connections_settings: 2044 if gate_connection_settings.socket_family == socket.AF_UNIX: 2045 try: 2046 os.unlink(gate_connection_settings.socket_address) 2047 except: 2048 if __debug__: self._log('EXCEPTION: SERVER END: TRYING TO UNLINK GOOD AF_UNIX GATE: {}'.format( 2049 gate_connection_settings.socket_address)) 2050 raise 2051 2052 def _check_for_initial_af_unix_socket_unlink(self, connection_settings: ConnectionSettings): 2053 if 'posix' == os.name: 2054 if connection_settings.socket_family == socket.AF_UNIX: 2055 if os.path.exists(connection_settings.socket_address): 2056 if __debug__: self._log('EXCEPTION: INITIATION: GATE: AF_UNIX SOCKET IS ALREADY EXIST: {}'.format( 2057 connection_settings.socket_address)) 2058 2059class ThereAreNoGateConections(Exception): 2060 pass 2061 2062class NotEnoughGateConnections(Exception): 2063 pass 2064 2065@contextmanager 2066def asock_io_core_connect(asock_io_core_obj: ASockIOCore, should_have_gate_connections: bool=False, backlog: int=1, 2067 should_have_all_desired_gate_connections: bool=False): 2068 try: 2069 desired_amount_of_gate_connections = len(asock_io_core_obj.gates_connections_settings) 2070 gate_connections_num = asock_io_core_obj.listen(backlog) 2071 if should_have_gate_connections and (not gate_connections_num): 2072 error_text = 'ERROR: CONTEXTMANAGER: BASIC INITIATION: THERE IS NO GOOD GATE CONNECTIONS!' 2073 asock_io_core_obj._log(error_text) 2074 raise ThereAreNoGateConections(error_text) 2075 else: 2076 if should_have_all_desired_gate_connections: 2077 if desired_amount_of_gate_connections != gate_connections_num: 2078 error_text = 'ERROR: CONTEXTMANAGER: BASIC INITIATION: NOT ENOUGH GOOD GATE CONNECTIONS!' 2079 asock_io_core_obj._log(error_text) 2080 raise NotEnoughGateConnections(error_text) 2081 print('THERE ARE CREATED {} GOOD GATE CONNECTIONS'.format(gate_connections_num)) 2082 yield asock_io_core_obj 2083 except: 2084 raise 2085 finally: 2086 asock_io_core_obj.io_iteration() 2087 asock_io_core_obj.destroy() 2088 asock_io_core_obj.io_iteration() 2089 if __debug__: print('RECV BUFF SIZES: {}'.format(str(asock_io_core_obj.recv_buff_sizes)[:150])) 2090 if __debug__: print('RECV SIZES: {}'.format(str(asock_io_core_obj.recv_sizes)[:150])) 2091 2092 2093class CheckIsRawConnection: 2094 def __call__(self, asock_io_core: ASockIOCore, connection_info: Connection)->bool: 2095 """ 2096 :param asock_io_core: 2097 :param connection_info: 2098 :return: "True" if it is RAW connection for Unknow Client. "False" otherwise. 2099 """ 2100 result = False 2101 try: 2102 if connection_info.conn.result.family in {socket.AF_INET, socket.AF_INET6}: 2103 if connection_info.addr.result[0] not in asock_io_core.set_of_gate_addresses: 2104 # If connected not from local IP address 2105 result = True 2106 except: 2107 pass 2108 return result
73class IoIterationResult: 74 """ 75 ([1] подключившиеся ожидаемые клиенты (ОКл); [2] ОКл сокет которых был отключен по причине ошибки 76 (сами ОКл еще небыли удалены - удаление нужно инициировать явно); [3] ОКл имеет очередь непрочитанных принятых 77 сообщений; [4] размер очереди неотправленных сообщений ОКл меньше порогового, а значит в нее можно записывать 78 новые запросы (не уверен пока в надобности этого параметра. Скорее всего он не нужен: актор в большинстве 79 случаев блокируется при вызове IO-операции; кроме случаев когда был задействован ассинхронный интерфейс, 80 при котором актор отправляет запрос не требуя ответа об успешном окончании операции (без какого-либо контроля 81 успешности, или же с ручным контролем путем вызова спец-метода, который-бы и проводил проверку, или же 82 считывание имеющихся результатов операций))) 83 """ 84 85 def __init__(self): 86 self.newly_connected_expected_clients = set() 87 self.newly_connected_unknown_clients = set() 88 self.clients_with_disconnected_connection = set() 89 self.clients_have_data_to_read = set() 90 self.clients_with_empty_output_fifo = set() 91 92 def update(self, other): 93 self.newly_connected_expected_clients.update(other.newly_connected_expected_clients) 94 self.newly_connected_unknown_clients.update(other.newly_connected_unknown_clients) 95 self.clients_with_disconnected_connection.update( 96 other.clients_with_disconnected_connection) 97 self.clients_have_data_to_read.update(other.clients_have_data_to_read) 98 self.clients_with_empty_output_fifo.update(other.clients_with_empty_output_fifo) 99 100 def remove(self, item): 101 if item in self.newly_connected_expected_clients: 102 self.newly_connected_expected_clients.remove(item) 103 if item in self.newly_connected_unknown_clients: 104 self.newly_connected_unknown_clients.remove(item) 105 if item in self.clients_with_disconnected_connection: 106 self.clients_with_disconnected_connection.remove(item) 107 if item in self.clients_have_data_to_read: 108 self.clients_have_data_to_read.remove(item) 109 if item in self.clients_with_empty_output_fifo: 110 self.clients_with_empty_output_fifo.remove(item) 111 112 def clear(self): 113 self.newly_connected_expected_clients.clear() 114 self.newly_connected_unknown_clients.clear() 115 self.clients_with_disconnected_connection.clear() 116 self.clients_have_data_to_read.clear() 117 self.clients_with_empty_output_fifo.clear()
([1] подключившиеся ожидаемые клиенты (ОКл); [2] ОКл сокет которых был отключен по причине ошибки (сами ОКл еще небыли удалены - удаление нужно инициировать явно); [3] ОКл имеет очередь непрочитанных принятых сообщений; [4] размер очереди неотправленных сообщений ОКл меньше порогового, а значит в нее можно записывать новые запросы (не уверен пока в надобности этого параметра. Скорее всего он не нужен: актор в большинстве случаев блокируется при вызове IO-операции; кроме случаев когда был задействован ассинхронный интерфейс, при котором актор отправляет запрос не требуя ответа об успешном окончании операции (без какого-либо контроля успешности, или же с ручным контролем путем вызова спец-метода, который-бы и проводил проверку, или же считывание имеющихся результатов операций)))
92 def update(self, other): 93 self.newly_connected_expected_clients.update(other.newly_connected_expected_clients) 94 self.newly_connected_unknown_clients.update(other.newly_connected_unknown_clients) 95 self.clients_with_disconnected_connection.update( 96 other.clients_with_disconnected_connection) 97 self.clients_have_data_to_read.update(other.clients_have_data_to_read) 98 self.clients_with_empty_output_fifo.update(other.clients_with_empty_output_fifo)
100 def remove(self, item): 101 if item in self.newly_connected_expected_clients: 102 self.newly_connected_expected_clients.remove(item) 103 if item in self.newly_connected_unknown_clients: 104 self.newly_connected_unknown_clients.remove(item) 105 if item in self.clients_with_disconnected_connection: 106 self.clients_with_disconnected_connection.remove(item) 107 if item in self.clients_have_data_to_read: 108 self.clients_have_data_to_read.remove(item) 109 if item in self.clients_with_empty_output_fifo: 110 self.clients_with_empty_output_fifo.remove(item)
120class ASockIOCoreMemoryManagement(IOCoreMemoryManagement): 121 def __init__(self): 122 super(ASockIOCoreMemoryManagement, self).__init__() 123 124 self.socket_read_fixed_buffer_size = ResultExistence(True, 125 1024 ** 2) # 1024**2 is the fastest fixed read buffer. 126 127 def link_to(self, parent): 128 super(ASockIOCoreMemoryManagement, self).link_to(parent) 129 try: 130 self.socket_read_fixed_buffer_size = parent.socket_read_fixed_buffer_size 131 except AttributeError: 132 pass
Inherited Members
- cengal.io.asock_io.versions.v_0.base.IOCoreMemoryManagement
- global__data_size_limit
- global_in__data_size_limit
- global_in__data_full_size
- global_in__deletable_data_full_size
- global_out__data_size_limit
- global_out__data_full_size
- global_out__deletable_data_full_size
135class Connection: 136 def __init__(self, 137 connection_id=None, 138 connection__conn_addr: tuple = None, 139 global_memory_management: ASockIOCoreMemoryManagement = None 140 ): 141 """ 142 143 :param connection_id: ID for this connection 144 :param connection__conn_addr: tuple(conn, addr) where conn is a socket, addr is an address 145 :param global_memory_management: global memory management obj 146 """ 147 self.id = connection_id 148 if connection__conn_addr is None: 149 self.conn = ResultExistence(False, None) 150 self.addr = ResultExistence(False, None) 151 else: 152 self.conn = ResultExistence(True, connection__conn_addr[0]) 153 self.addr = ResultExistence(True, connection__conn_addr[1]) 154 155 self.addr_info = None 156 self.host_names = None 157 158 self.recv_buff_size_computer = RecvBuffSizeComputer() 159 self.recv_buff_size = 0 160 self.calc_new_recv_buff_size(0) 161 162 self.should_be_closed = False # socket should be closed immediately. For example because of IO error. 163 self.ready_to_be_closed = False # socket should be closed, after all messages had been sent to client. 164 self.ready_for_deletion = False # connection should be deleted immediately. For example because of unexpected 165 # keyword. 166 167 self.keyword = None 168 169 self.raw_input_from_client = DynamicListOfPiecesDequeWithLengthControl( 170 external_data_length=global_memory_management.global_in__data_full_size) 171 self.current_message_length = None # length of current input message (or None, if size waw not read yet) 172 self.input_from_client = FIFODequeWithLengthControl( 173 external_data_full_size=global_memory_management.global_in__data_full_size) 174 self.current_memoryview_output = None 175 self.current_memoryview_input = None 176 self.output_to_client = FIFODequeWithLengthControl( 177 external_data_full_size=global_memory_management.global_out__data_full_size) 178 179 self.this_is_raw_connection = False 180 181 self.connected_expected_client_id = None 182 self.connected_expected_client = None 183 self.has_inline_processor = False 184 185 def calc_new_recv_buff_size(self, last_recv_amount): 186 self.recv_buff_size = self.recv_buff_size_computer.calc_new_recv_buff_size(last_recv_amount) 187 188 def remove(self): 189 self.raw_input_from_client.remove() 190 self.input_from_client.remove() 191 self.output_to_client.remove()
136 def __init__(self, 137 connection_id=None, 138 connection__conn_addr: tuple = None, 139 global_memory_management: ASockIOCoreMemoryManagement = None 140 ): 141 """ 142 143 :param connection_id: ID for this connection 144 :param connection__conn_addr: tuple(conn, addr) where conn is a socket, addr is an address 145 :param global_memory_management: global memory management obj 146 """ 147 self.id = connection_id 148 if connection__conn_addr is None: 149 self.conn = ResultExistence(False, None) 150 self.addr = ResultExistence(False, None) 151 else: 152 self.conn = ResultExistence(True, connection__conn_addr[0]) 153 self.addr = ResultExistence(True, connection__conn_addr[1]) 154 155 self.addr_info = None 156 self.host_names = None 157 158 self.recv_buff_size_computer = RecvBuffSizeComputer() 159 self.recv_buff_size = 0 160 self.calc_new_recv_buff_size(0) 161 162 self.should_be_closed = False # socket should be closed immediately. For example because of IO error. 163 self.ready_to_be_closed = False # socket should be closed, after all messages had been sent to client. 164 self.ready_for_deletion = False # connection should be deleted immediately. For example because of unexpected 165 # keyword. 166 167 self.keyword = None 168 169 self.raw_input_from_client = DynamicListOfPiecesDequeWithLengthControl( 170 external_data_length=global_memory_management.global_in__data_full_size) 171 self.current_message_length = None # length of current input message (or None, if size waw not read yet) 172 self.input_from_client = FIFODequeWithLengthControl( 173 external_data_full_size=global_memory_management.global_in__data_full_size) 174 self.current_memoryview_output = None 175 self.current_memoryview_input = None 176 self.output_to_client = FIFODequeWithLengthControl( 177 external_data_full_size=global_memory_management.global_out__data_full_size) 178 179 self.this_is_raw_connection = False 180 181 self.connected_expected_client_id = None 182 self.connected_expected_client = None 183 self.has_inline_processor = False
:param connection_id: ID for this connection :param connection__conn_addr: tuple(conn, addr) where conn is a socket, addr is an address :param global_memory_management: global memory management obj
194class InlineProcessor: 195 __slots__ = ('client_id', 'keyword', 'socket_family', 'socket_type', 'socket_proto', 'addr_info', 'host_names', 196 'is_in_raw_mode', '__set__is_in_raw_mode', '__set__mark_socket_as_should_be_closed_immediately', 197 '__set__mark_socket_as_ready_to_be_closed', '__external_parameters_set_trigger', 'output_messages', 198 '__hold__client_id') 199 200 def __init__(self, client_id=None, keyword: bytes = None, socket_family=None, socket_type=None, socket_proto=None, 201 addr_info=None, host_names=None, external_parameters_set_trigger: Set = None): 202 """ 203 204 :param keyword: client keyword. You may check for a known keywords to act appropriately 205 :param socket_family: 206 :param socket_type: 207 :param socket_proto: 208 :param addr_info: result of socket.getaddrinfo() call 209 :param host_names: result of socket.gethostbyaddr() call 210 """ 211 self.client_id = client_id 212 self.keyword = keyword 213 self.socket_family = socket_family 214 self.socket_type = socket_type 215 self.socket_proto = socket_proto 216 self.addr_info = addr_info 217 self.host_names = host_names 218 self.is_in_raw_mode = None 219 self.__hold__client_id = client_id 220 self.__set__is_in_raw_mode = False 221 self.__set__mark_socket_as_should_be_closed_immediately = False 222 self.__set__mark_socket_as_ready_to_be_closed = False 223 self.__external_parameters_set_trigger = external_parameters_set_trigger 224 225 # self.output_messages = FIFODequeWithLengthControl() 226 self.output_messages = deque() 227 # self.output_messages = list() 228 229 def on__data_received(self, data: bytes): 230 """ 231 Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data 232 Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will 233 be logged 234 :param data: piece of input data if connection is in RAW-mode and full message otherwise. 235 """ 236 pass 237 238 def on__output_buffers_are_empty(self): 239 """ 240 Will be called immediately when all output data was send. 241 Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data 242 Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will 243 be logged 244 """ 245 pass 246 247 def on__connection_lost(self): 248 """ 249 Will be called after connection was closed. Current Inline Processor object will be destroyed after this call. 250 Situation with unhandled exception will be logged. 251 """ 252 pass 253 254 def set__is_in_raw_mode(self, is_in_raw_mode: bool): 255 self.__set__is_in_raw_mode = is_in_raw_mode 256 self.__external_parameters_set_trigger.add(self.__hold__client_id) 257 258 def mark__socket_as_should_be_closed_immediately(self, mark_socket_as: bool): 259 self.__set__mark_socket_as_should_be_closed_immediately = mark_socket_as 260 self.__external_parameters_set_trigger.add(self.__hold__client_id) 261 262 def mark__socket_as_ready_to_be_closed(self, mark_socket_as: bool): 263 self.__set__mark_socket_as_ready_to_be_closed = mark_socket_as 264 self.__external_parameters_set_trigger.add(self.__hold__client_id)
200 def __init__(self, client_id=None, keyword: bytes = None, socket_family=None, socket_type=None, socket_proto=None, 201 addr_info=None, host_names=None, external_parameters_set_trigger: Set = None): 202 """ 203 204 :param keyword: client keyword. You may check for a known keywords to act appropriately 205 :param socket_family: 206 :param socket_type: 207 :param socket_proto: 208 :param addr_info: result of socket.getaddrinfo() call 209 :param host_names: result of socket.gethostbyaddr() call 210 """ 211 self.client_id = client_id 212 self.keyword = keyword 213 self.socket_family = socket_family 214 self.socket_type = socket_type 215 self.socket_proto = socket_proto 216 self.addr_info = addr_info 217 self.host_names = host_names 218 self.is_in_raw_mode = None 219 self.__hold__client_id = client_id 220 self.__set__is_in_raw_mode = False 221 self.__set__mark_socket_as_should_be_closed_immediately = False 222 self.__set__mark_socket_as_ready_to_be_closed = False 223 self.__external_parameters_set_trigger = external_parameters_set_trigger 224 225 # self.output_messages = FIFODequeWithLengthControl() 226 self.output_messages = deque() 227 # self.output_messages = list()
:param keyword: client keyword. You may check for a known keywords to act appropriately :param socket_family: :param socket_type: :param socket_proto: :param addr_info: result of socket.getaddrinfo() call :param host_names: result of socket.gethostbyaddr() call
229 def on__data_received(self, data: bytes): 230 """ 231 Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data 232 Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will 233 be logged 234 :param data: piece of input data if connection is in RAW-mode and full message otherwise. 235 """ 236 pass
Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will be logged :param data: piece of input data if connection is in RAW-mode and full message otherwise.
238 def on__output_buffers_are_empty(self): 239 """ 240 Will be called immediately when all output data was send. 241 Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data 242 Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will 243 be logged 244 """ 245 pass
Will be called immediately when all output data was send. Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will be logged
247 def on__connection_lost(self): 248 """ 249 Will be called after connection was closed. Current Inline Processor object will be destroyed after this call. 250 Situation with unhandled exception will be logged. 251 """ 252 pass
Will be called after connection was closed. Current Inline Processor object will be destroyed after this call. Situation with unhandled exception will be logged.
267class Client: 268 def __init__(self, connection_settings: ConnectionSettings, client_id=None, client_tcp_id=None): 269 """ 270 271 :param client_id: ID of the expected client 272 :param client_tcp_id: ID of the connection 273 :param connection_settings: useful ConnectionSettings parameters are {direction_role, keyword} - for a client, 274 and all - for the super server. 275 """ 276 self.id = client_id 277 self.connection_id = client_tcp_id 278 self.__connection = None # type: Optional[Connection] 279 self.connection_settings = connection_settings 280 self.connection_settings.check() 281 282 self.will_use_raw_client_connection = False 283 self.will_use_raw_connection_without_handshake = False 284 self.this_is_unknown_client = False 285 286 self.obj_for_inline_processing = None 287 288 def get_connection(self)->Connection: 289 return self.__connection
268 def __init__(self, connection_settings: ConnectionSettings, client_id=None, client_tcp_id=None): 269 """ 270 271 :param client_id: ID of the expected client 272 :param client_tcp_id: ID of the connection 273 :param connection_settings: useful ConnectionSettings parameters are {direction_role, keyword} - for a client, 274 and all - for the super server. 275 """ 276 self.id = client_id 277 self.connection_id = client_tcp_id 278 self.__connection = None # type: Optional[Connection] 279 self.connection_settings = connection_settings 280 self.connection_settings.check() 281 282 self.will_use_raw_client_connection = False 283 self.will_use_raw_connection_without_handshake = False 284 self.this_is_unknown_client = False 285 286 self.obj_for_inline_processing = None
:param client_id: ID of the expected client :param client_tcp_id: ID of the connection :param connection_settings: useful ConnectionSettings parameters are {direction_role, keyword} - for a client, and all - for the super server.
292class ASockIOCore(ASockIOCoreMemoryManagement): 293 def __init__(self, gates_connections_settings: Set[ConnectionSettings]): 294 """ 295 Port should not be open to a external world! 296 :param gates_connections_settings: set() of ConnectionSettings() 297 :return: 298 """ 299 super(ASockIOCore, self).__init__() 300 301 if os.name != 'nt': 302 self.po = select.poll() 303 self.last_all_sockets = set() # type: Set[int] 304 self.socket_by_fd = dict() # type: Dict[int, socket.socket] 305 306 self.check_sockets_sum_time = 0.0 307 self.check_sockets_qnt = 0 308 self.check_sockets_max_time = 0.0 309 310 self.gates_connections_settings = gates_connections_settings 311 if not self.gates_connections_settings: 312 self.gates_connections_settings = set() 313 # raise Exception('gates_connections_settings should be provided!') 314 for gates_connections_settings in self.gates_connections_settings: 315 gates_connections_settings.check() 316 self.faulty_connection_settings = set() 317 self._connection_settings_by_gate_conn = dict() 318 319 self.set_of_gate_addresses = set() 320 self._gate = set() 321 self.reuse_gate_addr = False 322 self.reuse_gate_port = False 323 324 self.message_size_len = MESSAGE_SIZE_LEN 325 self.server_answer__keyword_accepted = SERVER_ANSWER__KEYWORD_ACCEPTED 326 327 self._connections = dict() # key: ID; data: Connection() 328 self._connection_by_conn = dict() # key: conn; data: ID 329 self._connections_id_gen = IDGenerator() 330 331 self._connections_marked_as_ready_to_be_closed = set() 332 self._connections_marked_to_be_closed_immediately = set() 333 self._connections_marked_as_ready_to_be_deleted = set() 334 335 self._unconfirmed_clients = set() 336 337 self._we_have_connections_for_select = False 338 self._input_check_sockets = set() 339 self._output_check_sockets = set() 340 self._exception_check_sockets = set() 341 342 # ID (GUID) и другая информация клиентов, подключение которых явно ожидается 343 self._expected_clients = dict() # key: ID; data: Client() 344 self._expected_clients_id_gen = IDGenerator() 345 self._keywords_for_expected_clients = dict() # key: keyword; data: info 346 self._conns_of_expected_clients = dict() # key: conn; data expected_client_ID 347 348 self.unexpected_clients_are_allowed = True 349 350 # Список неопознанных и неожидаемых клиентов. Если клиент выдал свой GUID и позже кто-то добавил этот GUID в 351 # список ожидаемых клиентов - данный клиент будет автоматически подхвачен. 352 self._unexpected_clients = dict() # key: ID; data: Client() 353 self._unexpected_clients_id_gen = IDGenerator() 354 self._keywords_of_unexpected_clients = dict() 355 self._conns_of_unexpected_clients = dict() 356 357 self._io_iteration_result = IoIterationResult() 358 359 self.raw_checker_for_new_incoming_connections = CheckIsRawConnection() 360 self.need_to_auto_check_incoming_raw_connection = False 361 self.unknown_clients_are_allowed = False 362 self._unknown_clients_keyword_gen = IDGenerator(GeneratorType.guid_string) 363 self.prefix_for_unknown_client_keywords = b'UNKNOWN CLIENT: ' 364 365 self.echo_log = False 366 self._internal_log = deque() 367 368 self.recv_sizes = deque() 369 self.recv_buff_sizes = deque() 370 371 self.should_get_client_addr_info_on_connection = True 372 373 self.use_nodelay_inet = False 374 self.use_speed_optimized_socket_read = False 375 376 self.show_inform_about_accept_stop_because_of_all_buffers_size_limit = \ 377 FrontTriggerableVariable(FrontTriggerableVariableType.equal, True) 378 self.show_inform_about_read_stop_because_of_in_buffer_size_limit = \ 379 FrontTriggerableVariable(FrontTriggerableVariableType.equal, True) 380 self.show_inform_about_work_stop_because_of_out_buffer_size_limit = \ 381 FrontTriggerableVariable(FrontTriggerableVariableType.equal, True) 382 383 self.class_for_unknown_clients_inline_processing = None 384 385 self._clients_with_inline_processors_that_need_to_apply_parameters = set() 386 387 @staticmethod 388 def check_sockets_select(read: Set[int], write: Set[int], error: Set[int], 389 timeout: float)->Tuple[Set[int], Set[int], Set[int]]: 390 all_sockets = read | write | error 391 if all_sockets: 392 return select.select(read, 393 write, 394 error, 395 timeout) 396 else: 397 return set(), set(), set() 398 399 def check_sockets_poll(self, read: Set[int], write: Set[int], error: Set[int], 400 timeout: float)->Tuple[Set[int], Set[int], Set[int]]: 401 read_events = select.POLLIN | select.POLLPRI 402 write_events = select.POLLOUT 403 except_events = select.POLLERR | select.POLLHUP | select.POLLNVAL 404 if hasattr(select, 'POLLRDHUP'): 405 except_events |= select.POLLRDHUP 406 readable_events = {select.POLLIN, select.POLLPRI} 407 writable_events = {select.POLLOUT} 408 exceptional_events = {select.POLLERR, select.POLLHUP, select.POLLNVAL} 409 if hasattr(select, 'POLLRDHUP'): 410 exceptional_events.add(select.POLLRDHUP) 411 all_events_set = readable_events | writable_events | exceptional_events 412 413 timeout = int(timeout * 1000) 414 415 # print('>>> POLL {}: last_all_sockets: {}'.format(time.perf_counter(), self.last_all_sockets)) 416 all_sockets = read | write | error 417 # print('>>> POLL {}: all_sockets: {}'.format(time.perf_counter(), all_sockets)) 418 new_sockets = all_sockets - self.last_all_sockets 419 # print('>>> POLL {}: new_sockets: {}'.format(time.perf_counter(), new_sockets)) 420 still_sockets = all_sockets & self.last_all_sockets 421 # print('>>> POLL {}: still_sockets: {}'.format(time.perf_counter(), still_sockets)) 422 deleted_sockets = self.last_all_sockets - all_sockets 423 # print('>>> POLL {}: deleted_sockets: {}'.format(time.perf_counter(), deleted_sockets)) 424 self.last_all_sockets = all_sockets 425 426 for socket_fd in new_sockets: 427 event_mask = 0 428 if socket_fd in read: 429 event_mask |= read_events 430 if socket_fd in write: 431 event_mask |= write_events 432 if socket_fd in error: 433 event_mask |= except_events 434 # print('>>> POLL {}: new_socket: {}; event_mask: {}'.format(time.perf_counter(), socket_fd, event_mask)) 435 self.po.register(socket_fd, event_mask) 436 437 for socket_fd in still_sockets: 438 event_mask = 0 439 if socket_fd in read: 440 event_mask |= read_events 441 if socket_fd in write: 442 event_mask |= write_events 443 if socket_fd in error: 444 event_mask |= except_events 445 # print('>>> POLL {}: still_socket: {}; event_mask: {}'.format(time.perf_counter(), socket_fd, event_mask)) 446 self.po.modify(socket_fd, event_mask) 447 448 for socket_fd in deleted_sockets: 449 # print('>>> POLL {}: deleted_socket: {}'.format(time.perf_counter(), socket_fd)) 450 self.po.unregister(socket_fd) 451 452 poll_result = self.po.poll(timeout) 453 # print('>>> POLL {}: result: {}'.format(time.perf_counter(), poll_result)) 454 # sys.stdout.flush() 455 456 readable = set() 457 writable = set() 458 exceptional = set() 459 for socket_fd, event_mask in poll_result: 460 socket_events_set = set() 461 for another_event in all_events_set: 462 if event_mask & another_event: 463 socket_events_set.add(another_event) 464 465 if socket_events_set & readable_events: 466 readable.add(socket_fd) 467 if socket_events_set & writable_events: 468 writable.add(socket_fd) 469 if socket_events_set & exceptional_events: 470 exceptional.add(socket_fd) 471 472 return readable, writable, exceptional 473 474 def check_sockets(self, read: Set[socket.socket], write: Set[socket.socket], error: Set[socket.socket], 475 timeout: float)->Tuple[Set[socket.socket], Set[socket.socket], Set[socket.socket]]: 476 all_sockets = read | write | error 477 if all_sockets: 478 read_fd = set() 479 write_fd = set() 480 error_fd = set() 481 for conn in read: 482 read_fd.add(conn.fileno()) 483 for conn in write: 484 write_fd.add(conn.fileno()) 485 for conn in error: 486 error_fd.add(conn.fileno()) 487 488 check_sockets = self.check_sockets_select 489 if os.name != 'nt': 490 check_sockets = self.check_sockets_poll 491 492 readable_fd, writable_fd, exceptional_fd = check_sockets(read_fd, 493 write_fd, 494 error_fd, 495 timeout) 496 readable = set() 497 writable = set() 498 exceptional = set() 499 for fd in readable_fd: 500 readable.add(self.socket_by_fd[fd]) 501 for fd in writable_fd: 502 writable.add(self.socket_by_fd[fd]) 503 for fd in exceptional_fd: 504 exceptional.add(self.socket_by_fd[fd]) 505 return readable, writable, exceptional 506 else: 507 return set(), set(), set() 508 509 def gate_io_iteration(self, timeout=0.0): 510 result = self._io_iteration_result 511 if self._gate: 512 readable, writable, exceptional = self.check_sockets_select(self._gate, 513 set(), 514 set(), 515 timeout) 516 517 # Handle inputs 518 for s in readable: 519 self._read_data_from_socket(s) 520 521 self._io_iteration_result = IoIterationResult() 522 return result 523 524 # @profile 525 def io_iteration(self, timeout=0.0): 526 """ 527 528 :param timeout: timeout in seconds 529 :return: 530 """ 531 result = self._io_iteration_result 532 533 if self._we_have_connections_for_select: 534 # need_to_process = False 535 # all_sockets = self._input_check_sockets | self._output_check_sockets | self._exception_check_sockets 536 # if not (all_sockets - self._gate): 537 # timeout = 0.01 538 539 need_to_repeat = True 540 541 while need_to_repeat: 542 output_check_sockets = set() 543 544 # Is need to check writable sockets 545 need_to_check_writable_sockets = False 546 for s in self._output_check_sockets: 547 curr_client_info = self._connections[self._connection_by_conn[s]] 548 if curr_client_info.output_to_client.size(): 549 need_to_check_writable_sockets = True 550 break 551 552 if need_to_check_writable_sockets: 553 output_check_sockets = self._output_check_sockets 554 555 # print('>>> POLL {}: ri: {}, wi: {}, ei: {}'.format(time.perf_counter(), 556 # len(self._input_check_sockets), 557 # len(self._output_check_sockets), 558 # len(self._exception_check_sockets))) 559 # sys.stdout.flush() 560 check_sockets_start_time = time.perf_counter() 561 readable, writable, exceptional = self.check_sockets(self._input_check_sockets, 562 output_check_sockets, 563 self._exception_check_sockets, 564 timeout) 565 check_sockets_finish_time = time.perf_counter() 566 check_sockets_delta_time = check_sockets_finish_time - check_sockets_start_time 567 self.check_sockets_sum_time += check_sockets_delta_time 568 self.check_sockets_qnt += 1 569 if self.check_sockets_max_time < check_sockets_delta_time: 570 self.check_sockets_max_time = check_sockets_delta_time 571 check_socket_average_time = self.check_sockets_sum_time / self.check_sockets_qnt 572 # print('>>> CHECK SOCKET: DELTA {}: AVG: {}; SUM: {}; MAX: {}'.format( 573 # check_sockets_delta_time, 574 # check_socket_average_time, 575 # self.check_sockets_sum_time, 576 # self.check_sockets_max_time 577 # )) 578 # print('>>> POLL {}: ro: {}, wo: {}, eo: {}'.format(time.perf_counter(), 579 # len(readable), 580 # len(writable), 581 # len(exceptional))) 582 # sys.stdout.flush() 583 584 read_is_forbidden = True 585 if (self.global_in__data_full_size.result - self.global_in__deletable_data_full_size.result) \ 586 <= self.global_in__data_size_limit.result: 587 read_is_forbidden = False 588 589 # Handle inputs 590 for s in readable: 591 read_result = self._read_data_from_socket(s) 592 if read_result: 593 if s in self._unconfirmed_clients: 594 self._process_client_keyword(s) 595 self._check_is_client_have_data_to_read_in_fifo(s) 596 else: 597 self._client_have_data_to_read_in_fifo(s) 598 599 if __debug__: 600 read_is_forbidden_test = self.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger( 601 read_is_forbidden) 602 if read_is_forbidden_test is not None: 603 if read_is_forbidden_test: 604 print('Read is suppressed until data will be processed.') 605 else: 606 print('Read is allowed: data is processed.') 607 608 # Handle outputs 609 for s in writable: 610 curr_client_info = self._connections[self._connection_by_conn[s]] 611 self._write_data_to_socket(curr_client_info) 612 # self._write_data_to_socket(s) 613 614 # Handle "exceptional conditions" 615 for s in exceptional: 616 self._handle_connection_error(s) 617 618 # Set parameters for inline processors 619 if self._clients_with_inline_processors_that_need_to_apply_parameters: 620 for ec_id in self._clients_with_inline_processors_that_need_to_apply_parameters: 621 expected_client_info = self._expected_clients[ec_id] 622 connection_info = expected_client_info._Client__connection 623 self._inline_processor__apply_parameters(connection_info, expected_client_info) 624 self._clients_with_inline_processors_that_need_to_apply_parameters.clear() 625 626 # Close sockets 627 if self._connections_marked_to_be_closed_immediately: 628 sockets_should_be_closed_immediately = self._connections_marked_to_be_closed_immediately 629 self._connections_marked_to_be_closed_immediately = set() 630 for closeable_socket in sockets_should_be_closed_immediately: 631 connection_id = self._connection_by_conn[closeable_socket] 632 # self.close_connection_by_conn(closeable_socket) 633 self.close_connection(connection_id) 634 self._inline_processor__on__connection_lost_by_connection_id(connection_id) 635 636 # Removing clients 637 if self._connections_marked_as_ready_to_be_deleted: 638 clients_ready_to_be_deleted = self._connections_marked_as_ready_to_be_deleted 639 self._connections_marked_as_ready_to_be_deleted = set() 640 for faulty_socket in clients_ready_to_be_deleted: 641 self.remove_connection_by_conn(faulty_socket) 642 643 if (self.global_out__data_full_size.result - self.global_out__deletable_data_full_size.result) \ 644 <= self.global_out__data_size_limit.result: 645 need_to_repeat = False 646 else: 647 need_to_repeat = True 648 649 need_to_repeat = False 650 651 if __debug__: 652 need_to_repeat_show = self.show_inform_about_work_stop_because_of_out_buffer_size_limit.test_trigger( 653 need_to_repeat) 654 if need_to_repeat_show is not None: 655 if need_to_repeat_show: 656 print('Work is suppressed until data will be out.') 657 else: 658 print('Work is allowed: data is out.') 659 660 self._io_iteration_result = IoIterationResult() 661 return result 662 663 def listen(self, backlog=1): 664 # backlog = backlog or 1 665 666 new_connection_settings = set() 667 for gate_connection_settings in self.gates_connections_settings: 668 gate = None 669 try: 670 gate = socket.socket(gate_connection_settings.socket_family, gate_connection_settings.socket_type, 671 gate_connection_settings.socket_protocol, gate_connection_settings.socket_fileno) 672 self.socket_by_fd[gate.fileno()] = gate 673 gate.setblocking(0) 674 # gate.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 675 except (socket.error, OSError) as err: 676 gate = None 677 if __debug__: self._log('EXCEPTION: GATE: LISTEN: CREATE SOCKET: {}, {}'.format( 678 err.errno, err.strerror)) 679 continue 680 681 if self.reuse_gate_port: 682 gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 683 684 try: 685 self._check_for_initial_af_unix_socket_unlink(gate_connection_settings) 686 gate.bind(gate_connection_settings.socket_address) 687 except (socket.error, OSError) as err: 688 del self.socket_by_fd[gate.fileno()] 689 gate.close() 690 gate = None 691 if __debug__: self._log('EXCEPTION: GATE: BIND:"{}", {}, {}'.format( 692 gate_connection_settings.socket_address, err.errno, err.strerror)) 693 continue 694 try: 695 gate.listen(backlog) 696 except (socket.error, OSError) as err: 697 del self.socket_by_fd[gate.fileno()] 698 gate.close() 699 gate = None 700 if __debug__: self._log('EXCEPTION: GATE: LISTEN:"{}", {}, {}'.format( 701 gate_connection_settings.socket_address, err.errno, err.strerror)) 702 continue 703 704 if self.reuse_gate_addr: 705 gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 706 707 self._input_check_sockets.add(gate) 708 self._exception_check_sockets.add(gate) 709 710 if gate: 711 self._gate.add(gate) 712 if gate_connection_settings.socket_family in INET_TYPE_CONNECTIONS: 713 self.set_of_gate_addresses.add(gate_connection_settings.socket_address[0]) 714 elif socket.AF_UNIX == gate_connection_settings.socket_family: 715 self.set_of_gate_addresses.add(gate_connection_settings.socket_address) 716 else: 717 self.set_of_gate_addresses.add(gate_connection_settings.socket_address) 718 self._log('WARNING: GATE: SAVE CONNECTION ADDRESS: UNKNOWN SOCKET FAMILY') 719 self._connection_settings_by_gate_conn[gate] = gate_connection_settings 720 self._we_have_connections_for_select = True 721 new_connection_settings.add(gate_connection_settings) 722 else: 723 self.faulty_connection_settings.add(gate_connection_settings) 724 self.gates_connections_settings = new_connection_settings 725 726 return len(self.gates_connections_settings) 727 728 def close_all_connections(self): 729 if __debug__: self._log('CLOSE ALL CONNECTIONS:') 730 clients_list = dict(self._connections) 731 for connection_id, client_info in clients_list.items(): 732 self.close_connection(connection_id) 733 734 def remove_all_connections(self): 735 clients_list = dict(self._connections) 736 for connection_id, client_info in clients_list.items(): 737 self.remove_connection(connection_id) 738 739 def close(self): 740 for gate in self._gate: 741 del self.socket_by_fd[gate.fileno()] 742 gate.close() 743 744 if gate in self._input_check_sockets: 745 self._input_check_sockets.remove(gate) 746 if gate in self._exception_check_sockets: 747 self._exception_check_sockets.remove(gate) 748 749 if not self._input_check_sockets: 750 self._we_have_connections_for_select = False 751 self._unlink_good_af_unix_sockets() 752 753 def destroy(self): 754 self.close() 755 self.close_all_connections() 756 self.remove_all_connections() 757 758 def add_client(self, expected_client_info: Client): 759 """ 760 Добавляет новый expected client в список. Это может быть как клиент (который сам подключился или подключится в 761 будущем), так и супер-сервер, попытка подключения к которому будет осуществлена тут же - на месте. 762 При этом если произойдет какая-либо ошибка при подключении к супер-серверу - expected client не будет 763 зарегистрирован. Однако client может быть создан. В случае ошибки он будет помечен для закрытия и удаления. 764 Поэтому исключения нужно перехватывать, и после этого проводить как минимум один (как минимум завершающий - 765 перед закрытием и уничтожением сервера) цикл обработки io_iteration(). 766 :param expected_client_info: link to Client() 767 :return: expected_client_id 768 """ 769 if (expected_client_info.connection_settings.keyword is None) \ 770 and (ConnectionDirectionRole.client == expected_client_info.connection_settings.direction_role): 771 raise Exception('Keyword in Client.connection_settings should not be None for a Client connection!') 772 773 if expected_client_info.connection_settings.keyword in self._keywords_for_expected_clients: 774 raise Exception('Expected Client with keyword "{}" is already registered!'.format( 775 expected_client_info.connection_settings.keyword)) 776 777 expected_client_info.id = self._expected_clients_id_gen() 778 779 if self.unexpected_clients_are_allowed: 780 if expected_client_info.connection_settings.keyword in self._keywords_of_unexpected_clients: 781 # клиент уже подключен 782 unexpected_client_id = self._keywords_of_unexpected_clients[ 783 expected_client_info.connection_settings.keyword] 784 unexpected_client_info = self._unexpected_clients[unexpected_client_id] 785 connection_info = expected_client_info._Client__connection 786 if ( 787 unexpected_client_info.connection_settings.direction_role == expected_client_info.connection_settings.direction_role) and \ 788 (ConnectionDirectionRole.client == unexpected_client_info.connection_settings.direction_role): 789 # Произошел запрос на подключение к клиенту, и клиент с таким же ключевым словом уже 790 # подключен (с сервером этого быть не должно, и может произойти только при неверном ручном изменении 791 # внутренних данных объекта класса ASockIOCore). Необходимо переиспользовать уже имеющееся 792 # подключение. 793 # В случае если все же тут оказался соккет подключенный к супер-серверу - он будет автоматически 794 # отключен и соединение будет установлено в новом сокете. 795 expected_client_info.connection_id = unexpected_client_info.connection_id 796 expected_client_info._Client__connection = \ 797 unexpected_client_info._Client__connection 798 self._conns_of_expected_clients[connection_info.conn.result] = expected_client_info.id 799 connection_info.connected_expected_client_id = expected_client_info.id 800 connection_info.connected_expected_client = expected_client_info 801 self._io_iteration_result.newly_connected_expected_clients.add(expected_client_info.id) 802 else: 803 # Произошел запрос на подключение к супер-серверу, но клиент с таким же ключевым словом уже 804 # подключен (или наоборот). Или же просто имеется установленное соединение с супер-сервером. 805 # Необходимо его отключить. 806 self._mark_connection_to_be_closed_immediately(connection_info) 807 self._mark_connection_as_ready_for_deletion(connection_info) 808 self._remove_unexpected_client(unexpected_client_id) 809 else: 810 # клиент еще не подключен 811 expected_client_info.connection_id = None 812 expected_client_info._Client__connection = None 813 814 if ConnectionDirectionRole.server == expected_client_info.connection_settings.direction_role: 815 self._connect_to_super_server(expected_client_info) 816 817 self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword] = expected_client_info.id 818 self._expected_clients[expected_client_info.id] = expected_client_info 819 820 return expected_client_info.id 821 822 def get_client_id_by_keyword(self, expected_client_keyword): 823 """ 824 :param expected_client_keyword: expected_client_keyword 825 :return: link to Client() 826 """ 827 return self._keywords_for_expected_clients[expected_client_keyword] 828 829 def get_client_info(self, expected_client_id): 830 """ 831 :param expected_client_id: expected_client_id 832 :return: link to Client() 833 """ 834 return self._expected_clients[expected_client_id] 835 836 def get_connection_input_fifo_size_for_client(self, expected_client_id): 837 expected_client_info = self._expected_clients[expected_client_id] 838 connection_info = expected_client_info._Client__connection 839 if connection_info is None: 840 raise Exception('Expected client was not connected yet!') 841 # if client_info.this_is_raw_connection: 842 # if client_info.input_from_client.size(): 843 # return 1 844 # else: 845 # return 0 846 # else: 847 # return client_info.input_from_client.size() 848 return connection_info.input_from_client.size() 849 850 def get_message_from_client(self, expected_client_id): 851 expected_client_info = self._expected_clients[expected_client_id] 852 connection_info = expected_client_info._Client__connection 853 if connection_info is None: 854 raise Exception('Expected client was not connected yet!') 855 if not connection_info.input_from_client.size(): 856 raise Exception('There is no readable data in expected client\'s FIFO!') 857 # if client_info.this_is_raw_connection: 858 # self._consolidate_raw_messages_in_input_from_client_fifo(client_info) 859 return connection_info.input_from_client.get() 860 861 # @profile 862 def get_messages_from_client(self, expected_client_id): 863 expected_client_info = self._expected_clients[expected_client_id] 864 connection_info = expected_client_info._Client__connection 865 if connection_info is None: 866 raise Exception('Expected client was not connected yet!') 867 try: 868 while True: 869 yield connection_info.input_from_client.get() 870 except FIFOIsEmpty: 871 pass 872 # while client_info.input_from_client.size(): 873 # yield client_info.input_from_client.get() 874 875 def get_connection_output_fifo_size_for_client(self, expected_client_id): 876 expected_client_info = self._expected_clients[expected_client_id] 877 connection_info = expected_client_info._Client__connection 878 if connection_info is None: 879 raise Exception('Expected client was not connected yet!') 880 return connection_info.output_to_client.size() 881 882 def send_message_to_client(self, expected_client_id, data): 883 # data = bytes(data) 884 expected_client_info = self._expected_clients[expected_client_id] 885 connection_info = expected_client_info._Client__connection 886 if connection_info is None: 887 raise Exception('Expected client was not connected yet!') 888 if connection_info.this_is_raw_connection: 889 self._send_message_through_connection_raw(connection_info, data) 890 else: 891 self._send_message_through_connection(connection_info, data) 892 893 def send_messages_to_client(self, expected_client_id, messages_list): 894 # data = bytes(data) 895 expected_client_info = self._expected_clients[expected_client_id] 896 connection_info = expected_client_info._Client__connection 897 if connection_info is None: 898 raise Exception('Expected client was not connected yet!') 899 if connection_info.this_is_raw_connection: 900 self._send_messages_through_connection_raw(connection_info, messages_list) 901 else: 902 self._send_messages_through_connection(connection_info, messages_list) 903 904 def check_is_client_is_in_raw_connection_mode(self, expected_client_id): 905 expected_client_info = self._expected_clients[expected_client_id] 906 connection_info = expected_client_info._Client__connection 907 if connection_info is None: 908 raise Exception('Expected client was not connected yet!') 909 return connection_info.this_is_raw_connection 910 911 def switch_client_raw_connection_mode(self, expected_client_id, is_raw: bool): 912 expected_client_info = self._expected_clients[expected_client_id] 913 connection_info = expected_client_info._Client__connection 914 if connection_info is None: 915 raise Exception('Expected client was not connected yet!') 916 connection_info.this_is_raw_connection = is_raw 917 918 def set_inline_processor_for_client(self, expected_client_id, 919 class_for_unknown_clients_inline_processing: type): 920 expected_client_info = self._expected_clients[expected_client_id] 921 connection_info = expected_client_info._Client__connection 922 if connection_info is None: 923 raise Exception('Expected client was not connected yet!') 924 self._set_inline_processor_for_client(connection_info, expected_client_info, 925 class_for_unknown_clients_inline_processing) 926 927 def close_client_connection(self, expected_client_id, raise_if_already_closed=True): 928 """ 929 Connection will be closed immediately (inside this method) 930 :param expected_client_id: 931 :param raise_if_already_closed: 932 :return: 933 """ 934 if __debug__: self._log('CLOSE EXPECTED CLIENT SOCKET:') 935 expected_client_info = self._expected_clients[expected_client_id] 936 connection_info = expected_client_info._Client__connection 937 if raise_if_already_closed and (connection_info is None): 938 raise Exception('Expected client was not connected yet!') 939 self.close_connection(connection_info.id) 940 941 def mark_client_connection_as_should_be_closed_immediately(self, expected_client_id, 942 raise_if_already_closed=True): 943 """ 944 Connection will be closed immediately (inside main IO loop) 945 :param expected_client_id: 946 :param raise_if_already_closed: 947 :return: 948 """ 949 if __debug__: self._log('MARK EXPECTED CLIENT SOCKET AS SHOULD BE CLOSED IMMEDIATELY:') 950 expected_client_info = self._expected_clients[expected_client_id] 951 connection_info = expected_client_info._Client__connection 952 if raise_if_already_closed and (connection_info is None): 953 raise Exception('Expected client was not connected yet!') 954 self._mark_connection_to_be_closed_immediately(connection_info) 955 956 def mark_client_connection_as_ready_to_be_closed(self, expected_client_id, raise_if_already_closed=True): 957 """ 958 Connection will be closed when all output will be sent (inside main IO loop). 959 :param expected_client_id: 960 :param raise_if_already_closed: 961 :return: 962 """ 963 if __debug__: self._log('MARK EXPECTED CLIENT SOCKET AS READY TO BE CLOSED:') 964 expected_client_info = self._expected_clients[expected_client_id] 965 connection_info = expected_client_info._Client__connection 966 if raise_if_already_closed and (connection_info is None): 967 raise Exception('Expected client was not connected yet!') 968 self._mark_connection_as_ready_to_be_closed(connection_info) 969 970 def remove_client(self, expected_client_id): 971 if __debug__: self._log('REMOVE EXPECTED CLIENT: {}'.format(expected_client_id)) 972 expected_client_info = self._expected_clients[expected_client_id] 973 if __debug__: self._log('\tWITH KEYWORD: {}'.format(expected_client_info.connection_settings.keyword)) 974 connection_id = expected_client_info.connection_id 975 if connection_id is None: 976 self._remove_client(expected_client_id) 977 self.remove_connection(connection_id) 978 979 def add_connection(self, conn, address): 980 """ 981 :param conn: socket 982 :param address: address 983 :return: client ID 984 """ 985 if conn is None: 986 raise TypeError('conn should not be None!') 987 988 self.socket_by_fd[conn.fileno()] = conn 989 conn.setblocking(0) 990 if self.use_nodelay_inet and (conn.family in INET_TYPE_CONNECTIONS): 991 conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 992 993 new_client_id = self._connections_id_gen() 994 995 client_info = Connection(new_client_id, (conn, address), self) 996 self._connections[new_client_id] = client_info 997 self._connection_by_conn[conn] = new_client_id 998 self._input_check_sockets.add(conn) 999 self._exception_check_sockets.add(conn) 1000 self._we_have_connections_for_select = True 1001 1002 self._unconfirmed_clients.add(conn) 1003 1004 return new_client_id 1005 1006 def check_connection_existance(self, connection_id): 1007 if connection_id not in self._expected_clients: 1008 return False 1009 if connection_id not in self._connections: 1010 return False 1011 client_info = self._connections[connection_id] 1012 if not client_info.conn.existence: 1013 return False 1014 conn = client_info.conn.result 1015 if conn is None: 1016 return False 1017 return True 1018 1019 def close_connection(self, connection_id): 1020 if __debug__: self._log('CLOSE CLIENT {}:'.format(connection_id)) 1021 client_info = self._connections[connection_id] 1022 if not client_info.conn.existence: 1023 if __debug__: self._log('CLIENT {} CONN IS NOT SET.'.format(connection_id)) 1024 return 1025 conn = client_info.conn.result 1026 if conn is None: 1027 if __debug__: self._log('CLIENT {} CONN IS NONE.'.format(connection_id)) 1028 return 1029 1030 del self.socket_by_fd[conn.fileno()] 1031 conn.close() 1032 client_info.conn.existence = False 1033 client_info.output_to_client = copy.copy(client_info.output_to_client) # clear all output data to free some 1034 # memory even before destroying 1035 1036 if connection_id in self._connections_marked_as_ready_to_be_closed: 1037 self._connections_marked_as_ready_to_be_closed.remove(connection_id) 1038 if conn in self._connections_marked_to_be_closed_immediately: 1039 self._connections_marked_to_be_closed_immediately.remove(conn) 1040 if conn in self._connection_by_conn: 1041 del self._connection_by_conn[conn] 1042 if conn in self._input_check_sockets: 1043 self._input_check_sockets.remove(conn) 1044 if conn in self._output_check_sockets: 1045 self._output_check_sockets.remove(conn) 1046 if conn in self._exception_check_sockets: 1047 self._exception_check_sockets.remove(conn) 1048 1049 if not self._input_check_sockets: 1050 self._we_have_connections_for_select = False 1051 1052 if __debug__: self._log('CLIENT {} NORMALLY CLOSED.'.format(connection_id)) 1053 1054 def close_connection_by_conn(self, conn): 1055 # Если conn не в списке - вылетет ошибка. Это предотвратит ошибочное закрытие незарегистрированного сокета. 1056 # И мы сможем обнаружить наличие соответствующей ошибки в коде. 1057 if __debug__: self._log('CLOSE CLIENT BY CONN: {}'.format(repr(conn))) 1058 connection_id = self._connection_by_conn[conn] 1059 if __debug__: self._log('\t WITH CLIENT ID: {}'.format(connection_id)) 1060 self.close_connection(connection_id) 1061 1062 def remove_connection(self, connection_id): 1063 # client should NOT be removed immediately after connection close (close_connection): 1064 # code should do it by itself after reading all available input data 1065 if __debug__: self._log('REMOVE CLIENT: {}'.format(connection_id)) 1066 client_info = self._connections[connection_id] 1067 if __debug__: self._log('\tWITH KEYWORD: {}'.format(client_info.keyword)) 1068 if client_info.conn.existence: 1069 self.close_connection(connection_id) 1070 conn = client_info.conn.result 1071 if conn is None: 1072 return 1073 1074 if conn in self._conns_of_unexpected_clients: 1075 self._remove_unexpected_client(self._conns_of_unexpected_clients[conn]) 1076 1077 # if conn in self._conns_of_expected_clients: 1078 # self._remove_client(self._conns_of_expected_clients[conn]) 1079 if client_info.connected_expected_client_id is not None: 1080 self._remove_client(client_info.connected_expected_client_id) 1081 1082 client_info.connected_expected_client_id = None 1083 client_info.connected_expected_client = None 1084 1085 if connection_id in self._connections_marked_as_ready_to_be_deleted: 1086 self._connections_marked_as_ready_to_be_deleted.remove(connection_id) 1087 1088 client_info.conn.existence = False 1089 client_info.conn.result = None 1090 1091 del self._connections[connection_id] 1092 if conn in self._connection_by_conn: 1093 del self._connection_by_conn[conn] 1094 if conn in self._input_check_sockets: 1095 self._input_check_sockets.remove(conn) 1096 if conn in self._output_check_sockets: 1097 self._output_check_sockets.remove(conn) 1098 if conn in self._exception_check_sockets: 1099 self._exception_check_sockets.remove(conn) 1100 1101 # client_info.remove() 1102 1103 def remove_connection_by_conn(self, conn): 1104 connection_id = self._connection_by_conn[conn] 1105 self.remove_connection(connection_id) 1106 1107 def _log(self, log_string): 1108 self._internal_log.append(log_string) 1109 if self.echo_log: 1110 print(log_string) 1111 sys.stdout.flush() 1112 1113 def _create_unknown_client_from_connection(self, client_info: Connection): 1114 keyword = None 1115 keyword_is_ok = False 1116 while not keyword_is_ok: 1117 keyword = self.prefix_for_unknown_client_keywords + self._unknown_clients_keyword_gen().encode() 1118 if keyword not in self._keywords_for_expected_clients: 1119 keyword_is_ok = True 1120 connection_settings = ConnectionSettings(direction_role=ConnectionDirectionRole.client, 1121 socket_address=client_info.addr.result, keyword=keyword) 1122 expected_client_info = Client(connection_settings) 1123 expected_client_info.id = self._expected_clients_id_gen() 1124 1125 expected_client_info.connection_id = client_info.id 1126 expected_client_info._Client__connection = client_info 1127 expected_client_info.will_use_raw_client_connection = True 1128 expected_client_info.will_use_raw_connection_without_handshake = True 1129 expected_client_info.this_is_unknown_client = True 1130 if self.class_for_unknown_clients_inline_processing is not None: 1131 self._set_inline_processor_for_client(client_info, expected_client_info, 1132 self.class_for_unknown_clients_inline_processing) 1133 1134 self._conns_of_expected_clients[client_info.conn.result] = expected_client_info.id 1135 client_info.connected_expected_client_id = expected_client_info.id 1136 client_info.connected_expected_client = expected_client_info 1137 self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword] = expected_client_info.id 1138 self._expected_clients[expected_client_info.id] = expected_client_info 1139 1140 self._io_iteration_result.newly_connected_unknown_clients.add(expected_client_info.id) 1141 1142 return expected_client_info.id 1143 1144 def _set_inline_processor_for_client(self, connection_info: Connection, 1145 expected_client_info: Client, 1146 class_for_unknown_clients_inline_processing: type): 1147 assert type(class_for_unknown_clients_inline_processing) == type, \ 1148 '.class_for_unknown_clients_inline_processing must be a class or None' 1149 keyword = expected_client_info.connection_settings.keyword 1150 expected_client_info.obj_for_inline_processing = class_for_unknown_clients_inline_processing( 1151 expected_client_info.id, keyword, connection_info.conn.result.family, connection_info.conn.result.type, 1152 connection_info.conn.result.proto, copy.copy(connection_info.addr_info), 1153 copy.copy(connection_info.host_names), self._clients_with_inline_processors_that_need_to_apply_parameters 1154 ) 1155 connection_info.has_inline_processor = True 1156 self._inline_processor__init_parameters(connection_info, expected_client_info) 1157 1158 def _connect_to_super_server(self, expected_client_info: Client): 1159 """ 1160 Подключение происходит в блокируещем режиме (неблокирующий режим включается позже - в методе add_connection()). 1161 Для реализации неблокирующего режима надо оттестировать текущий код и ввести дополнительную неблокирующую 1162 логику через select/poll/epoll: 1163 http://man7.org/linux/man-pages/man2/connect.2.html 1164 :param expected_client_info: 1165 :return: 1166 """ 1167 connection_settings = expected_client_info.connection_settings 1168 conn = None 1169 try: 1170 conn = socket.socket(connection_settings.socket_family, connection_settings.socket_type, 1171 connection_settings.socket_protocol, connection_settings.socket_fileno) 1172 self.socket_by_fd[conn.fileno()] = conn 1173 # conn.setblocking(0) 1174 except (socket.error, OSError) as err: 1175 if __debug__: self._log('EXCEPTION: SUPER SERVER: CONNECT TO: CREATE SOCKET: {}, {}'.format( 1176 err.errno, err.strerror)) 1177 raise 1178 1179 try: 1180 conn.connect(connection_settings.socket_address) 1181 except (TimeoutError, socket.error, OSError) as err: 1182 # https://stackoverflow.com/questions/16772519/socket-recv-on-selected-socket-failing-with-etimedout 1183 del self.socket_by_fd[conn.fileno()] 1184 conn.close() 1185 if __debug__: self._log('EXCEPTION: SUPER SERVER: CONNECT TO: CONNECT:"{}", {}, {}'.format( 1186 connection_settings.socket_address, err.errno, err.strerror)) 1187 raise 1188 1189 super_server_client_id = self.add_connection(conn, connection_settings.socket_address) 1190 1191 addr_info = host_names = None 1192 try: 1193 if self.should_get_client_addr_info_on_connection and (conn.family in INET_TYPE_CONNECTIONS): 1194 addr_info = socket.getaddrinfo(connection_settings.socket_address[0], 1195 connection_settings.socket_address[1]) 1196 host_names = socket.gethostbyaddr(connection_settings.socket_address[0]) 1197 except ConnectionError as err: 1198 # An established connection was aborted by the software in your host machine 1199 if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(connection_settings.socket_address)) 1200 if __debug__: self._log('EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1201 connection_settings.socket_address, err.errno, err.strerror)) 1202 self._mark_connection_to_be_closed_immediately(super_server_client_id) 1203 ok = False 1204 except (socket.error, OSError) as err: 1205 if __debug__: self._log('EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1206 connection_settings.socket_address, err.errno, err.strerror)) 1207 if err.errno in SET_OF_CONNECTION_ERRORS: 1208 # An established connection was aborted by the software in your host machine 1209 if __debug__: self._log( 1210 'CLOSING {}: Connection reset by peer'.format(connection_settings.socket_address)) 1211 if __debug__: self._log( 1212 'EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1213 connection_settings.socket_address, err.errno, err.strerror)) 1214 self._mark_connection_to_be_closed_immediately(super_server_client_id) 1215 ok = False 1216 else: 1217 if 'nt' == os.name: 1218 if errno.WSAECONNRESET == err.errno: 1219 # An existing connection was forcibly closed by the remote host 1220 if __debug__: self._log( 1221 'CLOSING {}: Connection reset by peer'.format(connection_settings.socket_address)) 1222 if __debug__: self._log( 1223 'EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1224 connection_settings.socket_address, err.errno, err.strerror)) 1225 self._mark_connection_to_be_closed_immediately(super_server_client_id) 1226 ok = False 1227 else: 1228 raise err 1229 else: 1230 raise err 1231 1232 super_server_client_info = self._connections[super_server_client_id] 1233 super_server_client_info.addr_info = addr_info 1234 super_server_client_info.host_names = host_names 1235 self._log_new_connection(super_server_client_info, False) 1236 1237 if expected_client_info.will_use_raw_connection_without_handshake: 1238 # Connection is made without handshake 1239 super_server_client_info.this_is_raw_connection = True 1240 self._unconfirmed_clients.remove(super_server_client_info.conn.result) 1241 self._io_iteration_result.newly_connected_expected_clients.add(expected_client_info.id) 1242 else: 1243 keyword = expected_client_info.connection_settings.keyword 1244 if keyword is None: 1245 keyword = self._get_own_keyword_appropriate_for_connection(conn) 1246 if keyword is None: 1247 # Если ключевое слово не предоставлено - клиент будет помечен для закрытия и удаления 1248 self._mark_connection_to_be_closed_immediately(super_server_client_info) 1249 self._mark_connection_as_ready_for_deletion(super_server_client_info) 1250 raise Exception('Own keyword should be provided in connection_settings!') 1251 if keyword: 1252 # Если ключевое слово предоставлено (даже если оно путое типа b'' - оно будет отправлено супер-серверу). 1253 self._send_message_through_connection(super_server_client_info, keyword) 1254 1255 expected_client_info.connection_id = super_server_client_id 1256 expected_client_info._Client__connection = super_server_client_info 1257 self._conns_of_expected_clients[conn] = expected_client_info.id 1258 super_server_client_info.connected_expected_client_id = expected_client_info.id 1259 super_server_client_info.connected_expected_client = expected_client_info 1260 1261 def _get_own_keyword_appropriate_for_connection(self, conn): 1262 keyword = None 1263 random_keyword = None 1264 for gate_connection_settings in self.gates_connections_settings: 1265 random_keyword = gate_connection_settings.keyword 1266 if (conn.family == gate_connection_settings.socket_family) and ( 1267 conn.type == gate_connection_settings.socket_type) and \ 1268 (conn.proto == gate_connection_settings.socket_protocol): 1269 keyword = gate_connection_settings.keyword 1270 if keyword is None: 1271 keyword = random_keyword 1272 return keyword 1273 1274 def _pack_message(self, data): 1275 return len(data).to_bytes(self.message_size_len, 'little') + data 1276 1277 def _remove_client(self, expected_client_id): 1278 expected_client_info = self._expected_clients[expected_client_id] 1279 del self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword] 1280 del self._expected_clients[expected_client_id] 1281 connection_info = expected_client_info._Client__connection 1282 if connection_info is not None: 1283 # you can remove expected client before it will be connected to the server 1284 del self._conns_of_expected_clients[connection_info.conn.result] 1285 expected_client_info.connection_id = None 1286 expected_client_info._Client__connection = None 1287 1288 def _add_unexpected_client(self, connection_id, keyword): 1289 connection_settings = ConnectionSettings(direction_role=ConnectionDirectionRole.client, keyword=keyword) 1290 unexpected_client_info = Client(connection_settings, self._unexpected_clients_id_gen(), 1291 connection_id) 1292 1293 self._unexpected_clients[unexpected_client_info.id] = unexpected_client_info 1294 self._keywords_of_unexpected_clients[keyword] = unexpected_client_info.id 1295 connection_info = unexpected_client_info._Client__connection 1296 self._conns_of_unexpected_clients[connection_info.conn.result] = unexpected_client_info.id 1297 1298 return unexpected_client_info.id 1299 1300 def _remove_unexpected_client(self, unexpected_client_id): 1301 unexpected_client_info = self._unexpected_clients[unexpected_client_id] 1302 connection_info = unexpected_client_info._Client__connection # unexpected client appears only after 1303 # connection 1304 del self._keywords_of_unexpected_clients[unexpected_client_info.connection_settings.keyword] 1305 del self._unexpected_clients[unexpected_client_id] 1306 del self._conns_of_unexpected_clients[connection_info.conn.result] 1307 unexpected_client_info.connection_id = None 1308 unexpected_client_info._Client__connection = None 1309 1310 # @profile 1311 def _accept_new_connection(self, readable_socket: socket.socket): 1312 # One of a "readable" self._gate sockets is ready to accept a connection 1313 ok = True 1314 while ok: 1315 connection_id = None 1316 client_info = None 1317 1318 connection = None 1319 client_address = None 1320 try: 1321 connection, client_address = readable_socket.accept() 1322 self.socket_by_fd[connection.fileno()] = connection 1323 connection_id = self.add_connection(connection, client_address) 1324 client_info = self._connections[connection_id] 1325 except BlockingIOError as err: 1326 ok = False 1327 except InterruptedError as err: 1328 pass 1329 except (socket.error, OSError) as err: 1330 if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno): 1331 ok = False 1332 elif errno.EINTR == err.errno: 1333 pass 1334 elif errno.ECONNABORTED == err.errno: 1335 ok = False 1336 elif errno.EMFILE == err.errno: 1337 if __debug__: self._log( 1338 'The per-process limit on the number of open file descriptors had been reached.') 1339 ok = False 1340 elif errno.ENFILE == err.errno: 1341 if __debug__: self._log( 1342 'The system-wide limit on the total number of open files had been reached.') 1343 ok = False 1344 elif (errno.ENOBUFS == err.errno) or (errno.ENOMEM == err.errno): 1345 if __debug__: self._log( 1346 'Not enough free memory. Allocation is limited by the socket buffer limits.') 1347 ok = False 1348 elif errno.EPROTO == err.errno: 1349 if __debug__: self._log('Protocol error.') 1350 ok = False 1351 elif errno.EPERM == err.errno: 1352 if __debug__: self._log('Firewall rules forbid connection.') 1353 ok = False 1354 else: 1355 raise err 1356 1357 if (connection is not None) and (client_info is not None): 1358 addr_info = host_names = None 1359 try: 1360 if self.should_get_client_addr_info_on_connection and \ 1361 (connection.family in INET_TYPE_CONNECTIONS): 1362 addr_info = socket.getaddrinfo(client_address[0], client_address[1]) 1363 host_names = socket.gethostbyaddr(client_address[0]) 1364 except ConnectionError as err: 1365 # An established connection was aborted by the software in your host machine 1366 if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(client_address)) 1367 if __debug__: self._log('EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1368 client_address, err.errno, err.strerror)) 1369 self._mark_connection_to_be_closed_immediately(client_info) 1370 ok = False 1371 except (socket.error, OSError) as err: 1372 if __debug__: self._log('EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1373 client_address, err.errno, err.strerror)) 1374 if err.errno in SET_OF_CONNECTION_ERRORS: 1375 # An established connection was aborted by the software in your host machine 1376 if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(client_address)) 1377 if __debug__: self._log('EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1378 client_address, err.errno, err.strerror)) 1379 self._mark_connection_to_be_closed_immediately(client_info) 1380 ok = False 1381 else: 1382 if 'nt' == os.name: 1383 if errno.WSAECONNRESET == err.errno: 1384 # An existing connection was forcibly closed by the remote host 1385 if __debug__: self._log( 1386 'CLOSING {}: Connection reset by peer'.format(client_address)) 1387 if __debug__: self._log( 1388 'EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format( 1389 client_address, err.errno, err.strerror)) 1390 self._mark_connection_to_be_closed_immediately(client_info) 1391 ok = False 1392 else: 1393 raise err 1394 else: 1395 raise err 1396 1397 client_info.addr_info = addr_info 1398 client_info.host_names = host_names 1399 self._log_new_connection(client_info, True) 1400 1401 if self.need_to_auto_check_incoming_raw_connection \ 1402 and self.raw_checker_for_new_incoming_connections(self, client_info): 1403 # Is should be RAW: 1404 if self.unknown_clients_are_allowed: 1405 # Unknown clients are allowed - create Unknown Expected Client for this connection 1406 client_info.this_is_raw_connection = True 1407 self._unconfirmed_clients.remove(client_info.conn.result) 1408 self._create_unknown_client_from_connection(client_info) 1409 else: 1410 # Unknown clients are Non allowed - close connection. 1411 if __debug__: self._log( 1412 'UNKNOWN CLIENT {} WILL BE CLOSED: UNKNOWN CLIENTS ARE NOT ALLOWED'.format( 1413 client_info.addr.result 1414 )) 1415 self._mark_connection_to_be_closed_immediately(client_info) 1416 1417 # @profile 1418 def _read_data_from_already_connected_socket__inner__memory_optimized(self, curr_client_info: Connection, 1419 possible_messages_in_client_input_fifo=False): 1420 ok = True 1421 1422 data = curr_client_info.conn.result.recv(curr_client_info.recv_buff_size) 1423 data_len = len(data) 1424 curr_client_info.calc_new_recv_buff_size(data_len) 1425 if data: 1426 data = memoryview(data) 1427 if curr_client_info.this_is_raw_connection: 1428 curr_client_info.input_from_client.put(data) 1429 possible_messages_in_client_input_fifo = True 1430 else: 1431 curr_client_info.raw_input_from_client.add_piece_of_data(data) 1432 possible_messages_in_client_input_fifo = self._read_messages_from_raw_input_into_fifo( 1433 curr_client_info) 1434 else: 1435 # Interpret empty result as closed connection 1436 if __debug__: self._log( 1437 'CLOSING {} after reading no data:'.format(curr_client_info.addr.result)) 1438 self._mark_connection_to_be_closed_immediately(curr_client_info) 1439 ok = False 1440 1441 result = (ok, possible_messages_in_client_input_fifo) 1442 return result 1443 1444 # @profile 1445 def _read_data_from_already_connected_socket__inner__speed_optimized(self, curr_client_info: Connection, 1446 possible_messages_in_client_input_fifo=False): 1447 ok = True 1448 1449 if curr_client_info.current_memoryview_input: 1450 nbytes = 0 1451 try: 1452 nbytes = curr_client_info.conn.result.recv_into(curr_client_info.current_memoryview_input) 1453 except TimeoutError: 1454 # https://stackoverflow.com/questions/16772519/socket-recv-on-selected-socket-failing-with-etimedout 1455 pass 1456 1457 if nbytes > 0: 1458 data = curr_client_info.current_memoryview_input[:nbytes] 1459 curr_client_info.current_memoryview_input = curr_client_info.current_memoryview_input[nbytes:] 1460 1461 if curr_client_info.this_is_raw_connection: 1462 curr_client_info.input_from_client.put(data) 1463 possible_messages_in_client_input_fifo = True 1464 else: 1465 curr_client_info.raw_input_from_client.add_piece_of_data(data) 1466 possible_messages_in_client_input_fifo = self._read_messages_from_raw_input_into_fifo( 1467 curr_client_info) 1468 else: 1469 # Interpret empty result as closed connection 1470 if __debug__: self._log( 1471 'CLOSING {} after reading no data:'.format(curr_client_info.addr.result)) 1472 self._mark_connection_to_be_closed_immediately(curr_client_info) 1473 ok = False 1474 else: 1475 input_buffer = bytearray(self.socket_read_fixed_buffer_size.result) 1476 curr_client_info.current_memoryview_input = memoryview(input_buffer) 1477 1478 result = (ok, possible_messages_in_client_input_fifo) 1479 return result 1480 1481 # @profile 1482 def _read_data_from_already_connected_socket__shell(self, readable_socket: socket.socket): 1483 possible_messages_in_client_input_fifo = False 1484 1485 curr_client_id = self._connection_by_conn[readable_socket] 1486 curr_client_info = self._connections[curr_client_id] 1487 1488 ok = True 1489 while ok: 1490 try: 1491 if self.use_speed_optimized_socket_read: 1492 ok, possible_messages_in_client_input_fifo = \ 1493 self._read_data_from_already_connected_socket__inner__speed_optimized( 1494 curr_client_info, possible_messages_in_client_input_fifo) 1495 else: 1496 ok, possible_messages_in_client_input_fifo = \ 1497 self._read_data_from_already_connected_socket__inner__memory_optimized( 1498 curr_client_info, possible_messages_in_client_input_fifo) 1499 break # makes IO faster on 10-30% on all modes (message/raw, data/http, 1500 # static_read_buffer/non_static_read_buffer). 1501 # Ускорение происходит за счет того, что: 1) данных оказывается не настолько много чтобы кеш процессора 1502 # переполнялся и требовалась бы работа с оперативной памятью; 2) при таком разбиении ввод и вывод 1503 # начинают происходить (на уровне ОС) одновременно. Т.е. мы взяли из входного буфера порцию данных и 1504 # пустили в обработку. В это время ввод на уровне ОС продолжается: ОС заполняет опустевшие буферы. 1505 # После обработки мы пускаем данные на отправку и забираем очередную порцию данных из входного буфера. 1506 # В это время происходит отправка данных из буферов на уровне ОС. И т.д. 1507 except BlockingIOError as err: 1508 ok = False 1509 except InterruptedError as err: 1510 pass 1511 except ConnectionError as err: 1512 # An established connection was aborted by the software in your host machine 1513 if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result)) 1514 if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1515 curr_client_info.addr.result, err.errno, err.strerror)) 1516 self._mark_connection_to_be_closed_immediately(curr_client_info) 1517 ok = False 1518 except (socket.error, OSError) as err: 1519 if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno): 1520 ok = False 1521 elif errno.EINTR == err.errno: 1522 pass 1523 elif err.errno in SET_OF_CONNECTION_ERRORS: 1524 # An established connection was aborted by the software in your host machine 1525 if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result)) 1526 if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1527 curr_client_info.addr.result, err.errno, err.strerror)) 1528 self._mark_connection_to_be_closed_immediately(curr_client_info) 1529 ok = False 1530 else: 1531 if 'nt' == os.name: 1532 if errno.WSAECONNRESET == err.errno: 1533 # An existing connection was forcibly closed by the remote host 1534 if __debug__: self._log( 1535 'CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result)) 1536 if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1537 curr_client_info.addr.result, err.errno, err.strerror)) 1538 self._mark_connection_to_be_closed_immediately(curr_client_info) 1539 ok = False 1540 elif errno.EHOSTUNREACH == err.errno: 1541 # OSError: [Errno 113] No route to host 1542 if __debug__: self._log( 1543 'CLOSING {}: No route to host'.format(curr_client_info.addr.result)) 1544 if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1545 curr_client_info.addr.result, err.errno, err.strerror)) 1546 self._mark_connection_to_be_closed_immediately(curr_client_info) 1547 ok = False 1548 else: 1549 if __debug__: self._log( 1550 'CLOSING {}: Unknown reason'.format(curr_client_info.addr.result)) 1551 if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1552 curr_client_info.addr.result, err.errno, err.strerror)) 1553 raise err 1554 else: 1555 if errno.WSAEHOSTUNREACH == err.errno: 1556 # OSError: [Errno 113] No route to host 1557 if __debug__: self._log( 1558 'CLOSING {}: No route to host'.format(curr_client_info.addr.result)) 1559 if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1560 curr_client_info.addr.result, err.errno, err.strerror)) 1561 self._mark_connection_to_be_closed_immediately(curr_client_info) 1562 ok = False 1563 else: 1564 if __debug__: self._log( 1565 'CLOSING {}: Unknown reason'.format(curr_client_info.addr.result)) 1566 if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format( 1567 curr_client_info.addr.result, err.errno, err.strerror)) 1568 self._mark_connection_to_be_closed_immediately(curr_client_info) 1569 ok = False 1570 # raise err 1571 1572 read_is_forbidden = False 1573 if (self.global_in__data_full_size.result - self.global_in__deletable_data_full_size.result) \ 1574 > self.global_in__data_size_limit.result: 1575 read_is_forbidden = True 1576 ok = False 1577 1578 read_is_forbidden_test = self.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger( 1579 read_is_forbidden) 1580 if read_is_forbidden_test is not None: 1581 if read_is_forbidden_test: 1582 print('Read is suppressed until data will be processed.') 1583 else: 1584 print('Read is allowed: data is processed.') 1585 1586 return possible_messages_in_client_input_fifo 1587 1588 # @profile 1589 def _read_data_from_socket(self, readable_socket: socket.socket): 1590 possible_messages_in_client_input_fifo = False 1591 if readable_socket in self._gate: 1592 accept_is_forbidden = True 1593 if (self.global_in__data_full_size.result + self.global_out__data_full_size.result) \ 1594 <= self.global__data_size_limit.result: 1595 accept_is_forbidden = False 1596 self._accept_new_connection(readable_socket) 1597 1598 if __debug__: 1599 accept_is_forbidden_test = \ 1600 self.show_inform_about_accept_stop_because_of_all_buffers_size_limit.test_trigger( 1601 accept_is_forbidden) 1602 if accept_is_forbidden_test is not None: 1603 if accept_is_forbidden_test: 1604 print('Accept is suppressed until data will be processed and/or out.') 1605 else: 1606 print('Accept is allowed: data is processed and/or out.') 1607 else: 1608 possible_messages_in_client_input_fifo = self._read_data_from_already_connected_socket__shell( 1609 readable_socket) 1610 1611 return possible_messages_in_client_input_fifo 1612 1613 def _log_new_connection(self, client_info: Connection, is_incoming_connection): 1614 connection = client_info.conn.result 1615 client_address = client_info.addr.result 1616 addr_info = client_info.addr_info 1617 host_names = client_info.host_names 1618 1619 connection_type_string = 'OUTGOING' 1620 from_to = 'to' 1621 if is_incoming_connection: 1622 connection_type_string = 'INCOMING' 1623 from_to = 'from' 1624 1625 if __debug__: self._log('New {} connection {} {}'.format(connection_type_string, from_to, client_address)) 1626 if connection.family in {socket.AF_INET, socket.AF_INET6}: 1627 if __debug__: self._log('\taddr_info: {}'.format(addr_info)) 1628 if __debug__: self._log('\thost_names: {}'.format(host_names)) 1629 1630 # @profile 1631 def _write_data_to_socket(self, curr_client_info: Connection): 1632 # CAUTION: code here is optimized for speed - not for readability or beauty. 1633 1634 expected_client_info = curr_client_info.connected_expected_client 1635 writable_socket = curr_client_info.conn.result 1636 if curr_client_info.should_be_closed: 1637 curr_client_info.current_memoryview_output = None 1638 curr_client_info.output_to_client = copy.copy(curr_client_info.output_to_client) 1639 return 1640 1641 ok = True 1642 first_pass = True 1643 can_call__inline_processor__on__output_buffers_are_empty = True 1644 while ok: 1645 try: 1646 if curr_client_info.current_memoryview_output: 1647 nsent = writable_socket.send(curr_client_info.current_memoryview_output) 1648 curr_client_info.current_memoryview_output = curr_client_info.current_memoryview_output[nsent:] 1649 else: 1650 curr_client_info.current_memoryview_output = None 1651 output_fifo_size = curr_client_info.output_to_client.size() 1652 if output_fifo_size > 1: 1653 result_data, result_size, result_qnt = \ 1654 curr_client_info.output_to_client.get_at_least_size(524288) 1655 if result_qnt > 1: 1656 curr_client_info.current_memoryview_output = memoryview(b''.join(result_data)) 1657 else: 1658 curr_client_info.current_memoryview_output = memoryview(result_data.popleft()) 1659 elif output_fifo_size == 1: 1660 curr_client_info.current_memoryview_output = memoryview(curr_client_info.output_to_client.get()) 1661 1662 if curr_client_info.current_memoryview_output is None: 1663 # if curr_client_info.ready_to_be_closed: 1664 # if first_pass: 1665 # # Т.е. если данных на отправку небыло даже при первом проходе цикла - т.е. изначально. 1666 # # Это значит что все данные были отправлены, и можно закрывать соединение. 1667 # self._output_check_sockets.remove(writable_socket) 1668 # self._mark_connection_to_be_closed_immediately(curr_client_info) 1669 # # Если соединение помечено как "Готово к закрытию" - то нам надо дождаться момента когда 1670 # # данные будут отправлены, и только в этот момент закрывать соединение. Поэтому надо 1671 # # сохранить сокет в списке проверяемых для отправки. 1672 # else: 1673 # self._output_check_sockets.remove(writable_socket) 1674 if not curr_client_info.ready_to_be_closed: 1675 # Если соединение помечено как "Готово к закрытию" - то нам надо дождаться момента когда 1676 # данные будут отправлены, и только в этот момент закрывать соединение. Поэтому надо 1677 # сохранить сокет в списке проверяемых для отправки. 1678 self._output_check_sockets.remove(writable_socket) 1679 if first_pass and curr_client_info.ready_to_be_closed: 1680 # Т.е. если данных на отправку небыло даже при первом проходе цикла - т.е. изначально. 1681 # Это значит что все данные были отправлены, и можно закрывать соединение. 1682 self._output_check_sockets.remove(writable_socket) 1683 self._mark_connection_to_be_closed_immediately(curr_client_info) 1684 ok = False 1685 if expected_client_info: 1686 self._io_iteration_result.clients_with_empty_output_fifo.add( 1687 curr_client_info.connected_expected_client_id) 1688 if curr_client_info.has_inline_processor: 1689 if can_call__inline_processor__on__output_buffers_are_empty: 1690 if self._inline_processor__on__output_buffers_are_empty(curr_client_info, 1691 expected_client_info): 1692 ok = True 1693 can_call__inline_processor__on__output_buffers_are_empty = False 1694 except BlockingIOError as err: 1695 ok = False 1696 except InterruptedError as err: 1697 pass 1698 except ConnectionError as err: 1699 # An established connection was aborted by the software in your host machine 1700 if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result)) 1701 if __debug__: self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format( 1702 curr_client_info.addr.result, err.errno, err.strerror)) 1703 self._mark_connection_to_be_closed_immediately(curr_client_info) 1704 ok = False 1705 except (socket.error, OSError) as err: 1706 if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno): 1707 ok = False 1708 elif errno.EINTR == err.errno: 1709 pass 1710 elif err.errno in SET_OF_CONNECTION_ERRORS: 1711 # Connection reset by peer 1712 if __debug__: self._log( 1713 'CLOSING {}: Connection reset by peer ({})'.format(curr_client_info.addr.result, 1714 err.strerror)) 1715 if __debug__: self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format( 1716 curr_client_info.addr.result, err.errno, err.strerror)) 1717 self._mark_connection_to_be_closed_immediately(curr_client_info) 1718 ok = False 1719 else: 1720 if 'nt' == os.name: 1721 if errno.WSAECONNRESET == err.errno: 1722 # An existing connection was forcibly closed by the remote host 1723 if __debug__: self._log( 1724 'CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result)) 1725 if __debug__: self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format( 1726 curr_client_info.addr.result, err.errno, err.strerror)) 1727 self._mark_connection_to_be_closed_immediately(curr_client_info) 1728 ok = False 1729 else: 1730 raise err 1731 else: 1732 raise err 1733 first_pass = False 1734 1735 def _mark_connection_to_be_closed_immediately(self, client_info: Connection): 1736 client_info.should_be_closed = True 1737 client_info.current_memoryview_input = None 1738 self._connections_marked_to_be_closed_immediately.add(client_info.conn.result) 1739 if client_info.connected_expected_client_id is not None: 1740 self._io_iteration_result.clients_with_disconnected_connection.add( 1741 client_info.connected_expected_client_id) 1742 1743 def _mark_connection_as_ready_to_be_closed(self, client_info: Connection): 1744 client_info.ready_to_be_closed = True 1745 1746 def _mark_connection_as_ready_for_deletion(self, client_info: Connection): 1747 client_info.ready_for_deletion = True 1748 self._connections_marked_as_ready_to_be_deleted.add(client_info.conn.result) 1749 1750 def _handle_connection_error(self, writable_socket: socket.socket): 1751 # Data read from already connected client 1752 curr_client_id = self._connection_by_conn[writable_socket] 1753 curr_client_info = self._connections[curr_client_id] 1754 if curr_client_info.should_be_closed: 1755 return 1756 if __debug__: self._log('handling exceptional condition for {}'.format(curr_client_info.addr.result)) 1757 self._mark_connection_to_be_closed_immediately(curr_client_info) 1758 1759 # @profile 1760 def _read_messages_from_raw_input_into_fifo(self, curr_client_info: Connection): 1761 result = False 1762 1763 try: 1764 while True: 1765 if curr_client_info.current_message_length is None: 1766 current_message_length = curr_client_info.raw_input_from_client.get_data(self.message_size_len) 1767 if current_message_length is None: 1768 break 1769 1770 curr_client_info.current_message_length = int.from_bytes(current_message_length, 'little') 1771 1772 current_message = curr_client_info.raw_input_from_client.get_data( 1773 curr_client_info.current_message_length) 1774 if current_message is None: 1775 break 1776 else: 1777 curr_client_info.input_from_client.put(current_message) 1778 curr_client_info.current_message_length = None 1779 result = True 1780 except TypeError: 1781 pass 1782 1783 return result 1784 1785 def _send_message_through_connection(self, client_info: Connection, data): 1786 if client_info.conn.existence: 1787 client_info.output_to_client.put(len(data).to_bytes(self.message_size_len, 'little')) 1788 client_info.output_to_client.put(data) 1789 1790 self._output_check_sockets.add(client_info.conn.result) 1791 else: 1792 if __debug__: self._log('ERROR: SEND MESSAGE TO CLIENT {}: "{}"'.format(client_info.addr.result, data)) 1793 raise Exception('EXCEPTION: SEND MESSAGE TO CLIENT: Client is disconnected! You can not send data to him!') 1794 1795 def _generate_list_of_messages_with_their_length(self, messages_list): 1796 for message in messages_list: 1797 yield len(message).to_bytes(self.message_size_len, 'little') 1798 yield message 1799 1800 def _send_messages_through_connection(self, client_info: Connection, messages_list): 1801 if client_info.conn.existence: 1802 client_info.output_to_client.extend(self._generate_list_of_messages_with_their_length(messages_list)) 1803 1804 self._output_check_sockets.add(client_info.conn.result) 1805 else: 1806 if __debug__: self._log( 1807 'ERROR: SEND MESSAGES TO CLIENT {}: "{}"'.format(client_info.addr.result, messages_list)) 1808 raise Exception('EXCEPTION: SEND MESSAGES TO CLIENT: Client is disconnected! You can not send data to him!') 1809 1810 def _send_message_through_connection_raw(self, client_info: Connection, data): 1811 if client_info.conn.existence: 1812 client_info.output_to_client.put(data) 1813 self._output_check_sockets.add(client_info.conn.result) 1814 else: 1815 if __debug__: self._log('ERROR: SEND MESSAGE TO CLIENT {}: "{}"'.format(client_info.addr.result, data)) 1816 raise Exception( 1817 'EXCEPTION: SEND MESSAGE TO CLIENT RAW: Client is disconnected! You can not send data to him!') 1818 1819 def _send_messages_through_connection_raw(self, client_info: Connection, messages_list): 1820 if client_info.conn.existence: 1821 client_info.output_to_client.extend(messages_list) 1822 self._output_check_sockets.add(client_info.conn.result) 1823 else: 1824 if __debug__: self._log( 1825 'ERROR: SEND MESSAGES TO CLIENT {}: "{}"'.format(client_info.addr.result, messages_list)) 1826 raise Exception( 1827 'EXCEPTION: SEND MESSAGES TO CLIENT RAW: Client is disconnected! You can not send data to him!') 1828 1829 def _move_message_from_fifo_to_memoryview(self, client_info: Connection): 1830 if client_info.current_memoryview_output is None: 1831 if client_info.output_to_client.size(): 1832 client_info.current_memoryview_output = memoryview(client_info.output_to_client.get()) 1833 1834 # @profile 1835 def _consolidate_and_move_messages_from_fifo_to_memoryview(self, client_info: Connection): 1836 output_fifo_size = client_info.output_to_client.size() 1837 if output_fifo_size > 1: 1838 result_data, result_size, result_qnt = \ 1839 client_info.output_to_client.get_at_least_size(524288) 1840 if result_qnt > 1: 1841 client_info.current_memoryview_output = memoryview(b''.join(result_data)) 1842 else: 1843 client_info.current_memoryview_output = memoryview(result_data.popleft()) 1844 elif output_fifo_size == 1: 1845 client_info.current_memoryview_output = memoryview(client_info.output_to_client.get()) 1846 1847 def _process_client_keyword(self, client_socket: socket.socket): 1848 curr_client_id = self._connection_by_conn[client_socket] 1849 curr_client_info = self._connections[curr_client_id] 1850 1851 if curr_client_info.input_from_client.size() >= 0: 1852 expected_client_id = None 1853 expected_client_info = None 1854 1855 this_is_super_server_client = False 1856 if curr_client_info.connected_expected_client is not None: 1857 expected_client_info = curr_client_info.connected_expected_client 1858 expected_client_id = expected_client_info.id 1859 if ConnectionDirectionRole.server == expected_client_info.connection_settings.direction_role: 1860 this_is_super_server_client = True 1861 1862 if this_is_super_server_client: 1863 # This is connection to Super-Server. So we expect an answer like b'OK' 1864 super_server_answer__keyword_accepted = curr_client_info.input_from_client.get() 1865 super_server_answer__keyword_accepted = bytes(super_server_answer__keyword_accepted) 1866 if super_server_answer__keyword_accepted == self.server_answer__keyword_accepted: 1867 # Answer was acceptable 1868 self._unconfirmed_clients.remove(client_socket) 1869 self._io_iteration_result.newly_connected_expected_clients.add(expected_client_id) 1870 if expected_client_info.will_use_raw_client_connection: 1871 curr_client_info.this_is_raw_connection = True 1872 else: 1873 # Answer was NOT acceptable 1874 self._mark_connection_to_be_closed_immediately(curr_client_info) 1875 self._mark_connection_as_ready_for_deletion(curr_client_info) 1876 if __debug__: self._log('ERROR: SUPER SERVER ANSWER - KEYWORD WAS NOT ACCEPTED: {}'.format( 1877 super_server_answer__keyword_accepted)) 1878 else: 1879 # This is connection to client. So we expect a keyword 1880 keyword = curr_client_info.input_from_client.get() 1881 keyword = bytes(keyword) 1882 curr_client_info.keyword = keyword 1883 self._unconfirmed_clients.remove(client_socket) 1884 self._send_message_through_connection(curr_client_info, self.server_answer__keyword_accepted) 1885 1886 if keyword in self._keywords_for_expected_clients: 1887 # empty expected client was already registered 1888 expected_client_id = self._keywords_for_expected_clients[keyword] 1889 expected_client_info = self._expected_clients[expected_client_id] 1890 expected_client_info.connection_id = curr_client_id 1891 expected_client_info._Client__connection = curr_client_info 1892 self._conns_of_expected_clients[client_socket] = expected_client_id 1893 curr_client_info.connected_expected_client_id = expected_client_id 1894 curr_client_info.connected_expected_client = expected_client_info 1895 self._io_iteration_result.newly_connected_expected_clients.add(expected_client_id) 1896 if expected_client_info.will_use_raw_client_connection: 1897 curr_client_info.this_is_raw_connection = True 1898 else: 1899 # it is unknown expected client 1900 if self.unexpected_clients_are_allowed: 1901 self._add_unexpected_client(curr_client_id, keyword) 1902 else: 1903 self._mark_connection_to_be_closed_immediately(curr_client_info) 1904 self._mark_connection_as_ready_for_deletion(curr_client_info) 1905 1906 def _check_is_client_have_data_to_read_in_fifo(self, readable_socket: socket.socket): 1907 client_info = self._connections[self._connection_by_conn[readable_socket]] 1908 if client_info.connected_expected_client_id is not None: 1909 if client_info.input_from_client.size(): 1910 self._io_iteration_result.clients_have_data_to_read.add( 1911 client_info.connected_expected_client_id) 1912 if client_info.has_inline_processor: 1913 self._inline_processor__on__data_received(client_info) 1914 1915 def _client_have_data_to_read_in_fifo(self, readable_socket: socket.socket): 1916 if readable_socket in self._conns_of_expected_clients: 1917 expected_client_id = self._conns_of_expected_clients[readable_socket] 1918 expected_client = self._expected_clients[expected_client_id] 1919 self._io_iteration_result.clients_have_data_to_read.add(expected_client_id) 1920 client_info = expected_client._Client__connection 1921 if client_info.has_inline_processor: 1922 self._inline_processor__on__data_received(client_info) 1923 1924 def _inline_processor__apply_parameters(self, connection_info: Connection, 1925 expected_client: Client): 1926 inline_processor = expected_client.obj_for_inline_processing 1927 1928 inline_processor.is_in_raw_mode = inline_processor._InlineProcessor__set__is_in_raw_mode 1929 connection_info.this_is_raw_connection = inline_processor._InlineProcessor__set__is_in_raw_mode 1930 1931 if inline_processor._InlineProcessor__set__mark_socket_as_should_be_closed_immediately: 1932 inline_processor._InlineProcessor__set__mark_socket_as_should_be_closed_immediately = False 1933 self._mark_connection_to_be_closed_immediately(connection_info) 1934 1935 if inline_processor._InlineProcessor__set__mark_socket_as_ready_to_be_closed: 1936 inline_processor._InlineProcessor__set__mark_socket_as_ready_to_be_closed = False 1937 self._mark_connection_as_ready_to_be_closed(connection_info) 1938 1939 def _inline_processor__init_parameters(self, connection_info: Connection, 1940 expected_client: Client): 1941 inline_processor = expected_client.obj_for_inline_processing 1942 1943 inline_processor.is_in_raw_mode = connection_info.this_is_raw_connection 1944 inline_processor._InlineProcessor__set__is_in_raw_mode = connection_info.this_is_raw_connection 1945 1946 def _inline_processor__on__data_received(self, connection_info: Connection): 1947 expected_client = connection_info.connected_expected_client 1948 inline_processor = expected_client.obj_for_inline_processing 1949 1950 try: 1951 while connection_info.input_from_client.size(): 1952 inline_processor.on__data_received(connection_info.input_from_client.get()) 1953 1954 if inline_processor.output_messages: 1955 while inline_processor.output_messages: 1956 another_message = inline_processor.output_messages.popleft() 1957 if not connection_info.this_is_raw_connection: 1958 connection_info.output_to_client.put( 1959 len(another_message).to_bytes(self.message_size_len, 'little')) 1960 connection_info.output_to_client.put(another_message) 1961 self._output_check_sockets.add(connection_info.conn.result) 1962 if connection_info.output_to_client.get_data_full_size() >= 65536: 1963 self._write_data_to_socket(connection_info) 1964 return True 1965 except: 1966 self.remove_client(expected_client.id) 1967 exc = sys.exc_info() 1968 exception = exc 1969 error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0])) 1970 formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2]) 1971 exception = exception[:2] + (formatted_traceback,) 1972 trace_str = ''.join(exception[2]) 1973 result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str) 1974 if __debug__: self._log('EXCEPTION: INLINE PROCESSOR: ON DATA RECEIVED: {}'.format(result_string)) 1975 1976 return False 1977 1978 def _inline_processor__on__output_buffers_are_empty(self, connection_info: Connection, 1979 expected_client: Client): 1980 inline_processor = expected_client.obj_for_inline_processing 1981 1982 if not connection_info.has_inline_processor: 1983 return False 1984 1985 try: 1986 inline_processor.on__output_buffers_are_empty() 1987 1988 if inline_processor.output_messages: 1989 while inline_processor.output_messages: 1990 another_message = inline_processor.output_messages.popleft() 1991 if not connection_info.this_is_raw_connection: 1992 connection_info.output_to_client.put( 1993 len(another_message).to_bytes(self.message_size_len, 'little')) 1994 connection_info.output_to_client.put(another_message) 1995 self._output_check_sockets.add(connection_info.conn.result) 1996 if connection_info.output_to_client.get_data_full_size() >= 65536: 1997 # self._write_data_to_socket(connection_info.conn.result) 1998 self._write_data_to_socket(connection_info) 1999 return True 2000 except: 2001 self.remove_client(expected_client.id) 2002 exc = sys.exc_info() 2003 exception = exc 2004 error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0])) 2005 formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2]) 2006 exception = exception[:2] + (formatted_traceback,) 2007 trace_str = ''.join(exception[2]) 2008 result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str) 2009 if __debug__: self._log( 2010 'EXCEPTION: INLINE PROCESSOR: ON OUTPUT BUFFERS ARE EMPTY: {}'.format(result_string)) 2011 2012 return False 2013 2014 def _inline_processor__on__connection_lost(self, connection_info: Connection, 2015 expected_client: Client): 2016 inline_processor = expected_client.obj_for_inline_processing 2017 2018 if not connection_info.has_inline_processor: 2019 return False 2020 2021 try: 2022 inline_processor.on__connection_lost() 2023 except: 2024 exc = sys.exc_info() 2025 exception = exc 2026 error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0])) 2027 formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2]) 2028 exception = exception[:2] + (formatted_traceback,) 2029 trace_str = ''.join(exception[2]) 2030 result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str) 2031 if __debug__: self._log('EXCEPTION: INLINE PROCESSOR: ON CONNECTION LOST: {}'.format(result_string)) 2032 self.remove_client(expected_client.id) 2033 2034 def _inline_processor__on__connection_lost_by_connection_id(self, connection_id): 2035 connection_info = self._connections[connection_id] 2036 expected_client_info = connection_info.connected_expected_client 2037 if (expected_client_info is not None) and connection_info.has_inline_processor: 2038 self._inline_processor__on__connection_lost(connection_info, expected_client_info) 2039 self._io_iteration_result.clients_with_disconnected_connection.remove( 2040 expected_client_info.id) 2041 2042 def _unlink_good_af_unix_sockets(self): 2043 if 'posix' == os.name: 2044 for gate_connection_settings in self.gates_connections_settings: 2045 if gate_connection_settings.socket_family == socket.AF_UNIX: 2046 try: 2047 os.unlink(gate_connection_settings.socket_address) 2048 except: 2049 if __debug__: self._log('EXCEPTION: SERVER END: TRYING TO UNLINK GOOD AF_UNIX GATE: {}'.format( 2050 gate_connection_settings.socket_address)) 2051 raise 2052 2053 def _check_for_initial_af_unix_socket_unlink(self, connection_settings: ConnectionSettings): 2054 if 'posix' == os.name: 2055 if connection_settings.socket_family == socket.AF_UNIX: 2056 if os.path.exists(connection_settings.socket_address): 2057 if __debug__: self._log('EXCEPTION: INITIATION: GATE: AF_UNIX SOCKET IS ALREADY EXIST: {}'.format( 2058 connection_settings.socket_address))
293 def __init__(self, gates_connections_settings: Set[ConnectionSettings]): 294 """ 295 Port should not be open to a external world! 296 :param gates_connections_settings: set() of ConnectionSettings() 297 :return: 298 """ 299 super(ASockIOCore, self).__init__() 300 301 if os.name != 'nt': 302 self.po = select.poll() 303 self.last_all_sockets = set() # type: Set[int] 304 self.socket_by_fd = dict() # type: Dict[int, socket.socket] 305 306 self.check_sockets_sum_time = 0.0 307 self.check_sockets_qnt = 0 308 self.check_sockets_max_time = 0.0 309 310 self.gates_connections_settings = gates_connections_settings 311 if not self.gates_connections_settings: 312 self.gates_connections_settings = set() 313 # raise Exception('gates_connections_settings should be provided!') 314 for gates_connections_settings in self.gates_connections_settings: 315 gates_connections_settings.check() 316 self.faulty_connection_settings = set() 317 self._connection_settings_by_gate_conn = dict() 318 319 self.set_of_gate_addresses = set() 320 self._gate = set() 321 self.reuse_gate_addr = False 322 self.reuse_gate_port = False 323 324 self.message_size_len = MESSAGE_SIZE_LEN 325 self.server_answer__keyword_accepted = SERVER_ANSWER__KEYWORD_ACCEPTED 326 327 self._connections = dict() # key: ID; data: Connection() 328 self._connection_by_conn = dict() # key: conn; data: ID 329 self._connections_id_gen = IDGenerator() 330 331 self._connections_marked_as_ready_to_be_closed = set() 332 self._connections_marked_to_be_closed_immediately = set() 333 self._connections_marked_as_ready_to_be_deleted = set() 334 335 self._unconfirmed_clients = set() 336 337 self._we_have_connections_for_select = False 338 self._input_check_sockets = set() 339 self._output_check_sockets = set() 340 self._exception_check_sockets = set() 341 342 # ID (GUID) и другая информация клиентов, подключение которых явно ожидается 343 self._expected_clients = dict() # key: ID; data: Client() 344 self._expected_clients_id_gen = IDGenerator() 345 self._keywords_for_expected_clients = dict() # key: keyword; data: info 346 self._conns_of_expected_clients = dict() # key: conn; data expected_client_ID 347 348 self.unexpected_clients_are_allowed = True 349 350 # Список неопознанных и неожидаемых клиентов. Если клиент выдал свой GUID и позже кто-то добавил этот GUID в 351 # список ожидаемых клиентов - данный клиент будет автоматически подхвачен. 352 self._unexpected_clients = dict() # key: ID; data: Client() 353 self._unexpected_clients_id_gen = IDGenerator() 354 self._keywords_of_unexpected_clients = dict() 355 self._conns_of_unexpected_clients = dict() 356 357 self._io_iteration_result = IoIterationResult() 358 359 self.raw_checker_for_new_incoming_connections = CheckIsRawConnection() 360 self.need_to_auto_check_incoming_raw_connection = False 361 self.unknown_clients_are_allowed = False 362 self._unknown_clients_keyword_gen = IDGenerator(GeneratorType.guid_string) 363 self.prefix_for_unknown_client_keywords = b'UNKNOWN CLIENT: ' 364 365 self.echo_log = False 366 self._internal_log = deque() 367 368 self.recv_sizes = deque() 369 self.recv_buff_sizes = deque() 370 371 self.should_get_client_addr_info_on_connection = True 372 373 self.use_nodelay_inet = False 374 self.use_speed_optimized_socket_read = False 375 376 self.show_inform_about_accept_stop_because_of_all_buffers_size_limit = \ 377 FrontTriggerableVariable(FrontTriggerableVariableType.equal, True) 378 self.show_inform_about_read_stop_because_of_in_buffer_size_limit = \ 379 FrontTriggerableVariable(FrontTriggerableVariableType.equal, True) 380 self.show_inform_about_work_stop_because_of_out_buffer_size_limit = \ 381 FrontTriggerableVariable(FrontTriggerableVariableType.equal, True) 382 383 self.class_for_unknown_clients_inline_processing = None 384 385 self._clients_with_inline_processors_that_need_to_apply_parameters = set()
Port should not be open to a external world! :param gates_connections_settings: set() of ConnectionSettings() :return:
387 @staticmethod 388 def check_sockets_select(read: Set[int], write: Set[int], error: Set[int], 389 timeout: float)->Tuple[Set[int], Set[int], Set[int]]: 390 all_sockets = read | write | error 391 if all_sockets: 392 return select.select(read, 393 write, 394 error, 395 timeout) 396 else: 397 return set(), set(), set()
399 def check_sockets_poll(self, read: Set[int], write: Set[int], error: Set[int], 400 timeout: float)->Tuple[Set[int], Set[int], Set[int]]: 401 read_events = select.POLLIN | select.POLLPRI 402 write_events = select.POLLOUT 403 except_events = select.POLLERR | select.POLLHUP | select.POLLNVAL 404 if hasattr(select, 'POLLRDHUP'): 405 except_events |= select.POLLRDHUP 406 readable_events = {select.POLLIN, select.POLLPRI} 407 writable_events = {select.POLLOUT} 408 exceptional_events = {select.POLLERR, select.POLLHUP, select.POLLNVAL} 409 if hasattr(select, 'POLLRDHUP'): 410 exceptional_events.add(select.POLLRDHUP) 411 all_events_set = readable_events | writable_events | exceptional_events 412 413 timeout = int(timeout * 1000) 414 415 # print('>>> POLL {}: last_all_sockets: {}'.format(time.perf_counter(), self.last_all_sockets)) 416 all_sockets = read | write | error 417 # print('>>> POLL {}: all_sockets: {}'.format(time.perf_counter(), all_sockets)) 418 new_sockets = all_sockets - self.last_all_sockets 419 # print('>>> POLL {}: new_sockets: {}'.format(time.perf_counter(), new_sockets)) 420 still_sockets = all_sockets & self.last_all_sockets 421 # print('>>> POLL {}: still_sockets: {}'.format(time.perf_counter(), still_sockets)) 422 deleted_sockets = self.last_all_sockets - all_sockets 423 # print('>>> POLL {}: deleted_sockets: {}'.format(time.perf_counter(), deleted_sockets)) 424 self.last_all_sockets = all_sockets 425 426 for socket_fd in new_sockets: 427 event_mask = 0 428 if socket_fd in read: 429 event_mask |= read_events 430 if socket_fd in write: 431 event_mask |= write_events 432 if socket_fd in error: 433 event_mask |= except_events 434 # print('>>> POLL {}: new_socket: {}; event_mask: {}'.format(time.perf_counter(), socket_fd, event_mask)) 435 self.po.register(socket_fd, event_mask) 436 437 for socket_fd in still_sockets: 438 event_mask = 0 439 if socket_fd in read: 440 event_mask |= read_events 441 if socket_fd in write: 442 event_mask |= write_events 443 if socket_fd in error: 444 event_mask |= except_events 445 # print('>>> POLL {}: still_socket: {}; event_mask: {}'.format(time.perf_counter(), socket_fd, event_mask)) 446 self.po.modify(socket_fd, event_mask) 447 448 for socket_fd in deleted_sockets: 449 # print('>>> POLL {}: deleted_socket: {}'.format(time.perf_counter(), socket_fd)) 450 self.po.unregister(socket_fd) 451 452 poll_result = self.po.poll(timeout) 453 # print('>>> POLL {}: result: {}'.format(time.perf_counter(), poll_result)) 454 # sys.stdout.flush() 455 456 readable = set() 457 writable = set() 458 exceptional = set() 459 for socket_fd, event_mask in poll_result: 460 socket_events_set = set() 461 for another_event in all_events_set: 462 if event_mask & another_event: 463 socket_events_set.add(another_event) 464 465 if socket_events_set & readable_events: 466 readable.add(socket_fd) 467 if socket_events_set & writable_events: 468 writable.add(socket_fd) 469 if socket_events_set & exceptional_events: 470 exceptional.add(socket_fd) 471 472 return readable, writable, exceptional
474 def check_sockets(self, read: Set[socket.socket], write: Set[socket.socket], error: Set[socket.socket], 475 timeout: float)->Tuple[Set[socket.socket], Set[socket.socket], Set[socket.socket]]: 476 all_sockets = read | write | error 477 if all_sockets: 478 read_fd = set() 479 write_fd = set() 480 error_fd = set() 481 for conn in read: 482 read_fd.add(conn.fileno()) 483 for conn in write: 484 write_fd.add(conn.fileno()) 485 for conn in error: 486 error_fd.add(conn.fileno()) 487 488 check_sockets = self.check_sockets_select 489 if os.name != 'nt': 490 check_sockets = self.check_sockets_poll 491 492 readable_fd, writable_fd, exceptional_fd = check_sockets(read_fd, 493 write_fd, 494 error_fd, 495 timeout) 496 readable = set() 497 writable = set() 498 exceptional = set() 499 for fd in readable_fd: 500 readable.add(self.socket_by_fd[fd]) 501 for fd in writable_fd: 502 writable.add(self.socket_by_fd[fd]) 503 for fd in exceptional_fd: 504 exceptional.add(self.socket_by_fd[fd]) 505 return readable, writable, exceptional 506 else: 507 return set(), set(), set()
509 def gate_io_iteration(self, timeout=0.0): 510 result = self._io_iteration_result 511 if self._gate: 512 readable, writable, exceptional = self.check_sockets_select(self._gate, 513 set(), 514 set(), 515 timeout) 516 517 # Handle inputs 518 for s in readable: 519 self._read_data_from_socket(s) 520 521 self._io_iteration_result = IoIterationResult() 522 return result
525 def io_iteration(self, timeout=0.0): 526 """ 527 528 :param timeout: timeout in seconds 529 :return: 530 """ 531 result = self._io_iteration_result 532 533 if self._we_have_connections_for_select: 534 # need_to_process = False 535 # all_sockets = self._input_check_sockets | self._output_check_sockets | self._exception_check_sockets 536 # if not (all_sockets - self._gate): 537 # timeout = 0.01 538 539 need_to_repeat = True 540 541 while need_to_repeat: 542 output_check_sockets = set() 543 544 # Is need to check writable sockets 545 need_to_check_writable_sockets = False 546 for s in self._output_check_sockets: 547 curr_client_info = self._connections[self._connection_by_conn[s]] 548 if curr_client_info.output_to_client.size(): 549 need_to_check_writable_sockets = True 550 break 551 552 if need_to_check_writable_sockets: 553 output_check_sockets = self._output_check_sockets 554 555 # print('>>> POLL {}: ri: {}, wi: {}, ei: {}'.format(time.perf_counter(), 556 # len(self._input_check_sockets), 557 # len(self._output_check_sockets), 558 # len(self._exception_check_sockets))) 559 # sys.stdout.flush() 560 check_sockets_start_time = time.perf_counter() 561 readable, writable, exceptional = self.check_sockets(self._input_check_sockets, 562 output_check_sockets, 563 self._exception_check_sockets, 564 timeout) 565 check_sockets_finish_time = time.perf_counter() 566 check_sockets_delta_time = check_sockets_finish_time - check_sockets_start_time 567 self.check_sockets_sum_time += check_sockets_delta_time 568 self.check_sockets_qnt += 1 569 if self.check_sockets_max_time < check_sockets_delta_time: 570 self.check_sockets_max_time = check_sockets_delta_time 571 check_socket_average_time = self.check_sockets_sum_time / self.check_sockets_qnt 572 # print('>>> CHECK SOCKET: DELTA {}: AVG: {}; SUM: {}; MAX: {}'.format( 573 # check_sockets_delta_time, 574 # check_socket_average_time, 575 # self.check_sockets_sum_time, 576 # self.check_sockets_max_time 577 # )) 578 # print('>>> POLL {}: ro: {}, wo: {}, eo: {}'.format(time.perf_counter(), 579 # len(readable), 580 # len(writable), 581 # len(exceptional))) 582 # sys.stdout.flush() 583 584 read_is_forbidden = True 585 if (self.global_in__data_full_size.result - self.global_in__deletable_data_full_size.result) \ 586 <= self.global_in__data_size_limit.result: 587 read_is_forbidden = False 588 589 # Handle inputs 590 for s in readable: 591 read_result = self._read_data_from_socket(s) 592 if read_result: 593 if s in self._unconfirmed_clients: 594 self._process_client_keyword(s) 595 self._check_is_client_have_data_to_read_in_fifo(s) 596 else: 597 self._client_have_data_to_read_in_fifo(s) 598 599 if __debug__: 600 read_is_forbidden_test = self.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger( 601 read_is_forbidden) 602 if read_is_forbidden_test is not None: 603 if read_is_forbidden_test: 604 print('Read is suppressed until data will be processed.') 605 else: 606 print('Read is allowed: data is processed.') 607 608 # Handle outputs 609 for s in writable: 610 curr_client_info = self._connections[self._connection_by_conn[s]] 611 self._write_data_to_socket(curr_client_info) 612 # self._write_data_to_socket(s) 613 614 # Handle "exceptional conditions" 615 for s in exceptional: 616 self._handle_connection_error(s) 617 618 # Set parameters for inline processors 619 if self._clients_with_inline_processors_that_need_to_apply_parameters: 620 for ec_id in self._clients_with_inline_processors_that_need_to_apply_parameters: 621 expected_client_info = self._expected_clients[ec_id] 622 connection_info = expected_client_info._Client__connection 623 self._inline_processor__apply_parameters(connection_info, expected_client_info) 624 self._clients_with_inline_processors_that_need_to_apply_parameters.clear() 625 626 # Close sockets 627 if self._connections_marked_to_be_closed_immediately: 628 sockets_should_be_closed_immediately = self._connections_marked_to_be_closed_immediately 629 self._connections_marked_to_be_closed_immediately = set() 630 for closeable_socket in sockets_should_be_closed_immediately: 631 connection_id = self._connection_by_conn[closeable_socket] 632 # self.close_connection_by_conn(closeable_socket) 633 self.close_connection(connection_id) 634 self._inline_processor__on__connection_lost_by_connection_id(connection_id) 635 636 # Removing clients 637 if self._connections_marked_as_ready_to_be_deleted: 638 clients_ready_to_be_deleted = self._connections_marked_as_ready_to_be_deleted 639 self._connections_marked_as_ready_to_be_deleted = set() 640 for faulty_socket in clients_ready_to_be_deleted: 641 self.remove_connection_by_conn(faulty_socket) 642 643 if (self.global_out__data_full_size.result - self.global_out__deletable_data_full_size.result) \ 644 <= self.global_out__data_size_limit.result: 645 need_to_repeat = False 646 else: 647 need_to_repeat = True 648 649 need_to_repeat = False 650 651 if __debug__: 652 need_to_repeat_show = self.show_inform_about_work_stop_because_of_out_buffer_size_limit.test_trigger( 653 need_to_repeat) 654 if need_to_repeat_show is not None: 655 if need_to_repeat_show: 656 print('Work is suppressed until data will be out.') 657 else: 658 print('Work is allowed: data is out.') 659 660 self._io_iteration_result = IoIterationResult() 661 return result
:param timeout: timeout in seconds :return:
663 def listen(self, backlog=1): 664 # backlog = backlog or 1 665 666 new_connection_settings = set() 667 for gate_connection_settings in self.gates_connections_settings: 668 gate = None 669 try: 670 gate = socket.socket(gate_connection_settings.socket_family, gate_connection_settings.socket_type, 671 gate_connection_settings.socket_protocol, gate_connection_settings.socket_fileno) 672 self.socket_by_fd[gate.fileno()] = gate 673 gate.setblocking(0) 674 # gate.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 675 except (socket.error, OSError) as err: 676 gate = None 677 if __debug__: self._log('EXCEPTION: GATE: LISTEN: CREATE SOCKET: {}, {}'.format( 678 err.errno, err.strerror)) 679 continue 680 681 if self.reuse_gate_port: 682 gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 683 684 try: 685 self._check_for_initial_af_unix_socket_unlink(gate_connection_settings) 686 gate.bind(gate_connection_settings.socket_address) 687 except (socket.error, OSError) as err: 688 del self.socket_by_fd[gate.fileno()] 689 gate.close() 690 gate = None 691 if __debug__: self._log('EXCEPTION: GATE: BIND:"{}", {}, {}'.format( 692 gate_connection_settings.socket_address, err.errno, err.strerror)) 693 continue 694 try: 695 gate.listen(backlog) 696 except (socket.error, OSError) as err: 697 del self.socket_by_fd[gate.fileno()] 698 gate.close() 699 gate = None 700 if __debug__: self._log('EXCEPTION: GATE: LISTEN:"{}", {}, {}'.format( 701 gate_connection_settings.socket_address, err.errno, err.strerror)) 702 continue 703 704 if self.reuse_gate_addr: 705 gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 706 707 self._input_check_sockets.add(gate) 708 self._exception_check_sockets.add(gate) 709 710 if gate: 711 self._gate.add(gate) 712 if gate_connection_settings.socket_family in INET_TYPE_CONNECTIONS: 713 self.set_of_gate_addresses.add(gate_connection_settings.socket_address[0]) 714 elif socket.AF_UNIX == gate_connection_settings.socket_family: 715 self.set_of_gate_addresses.add(gate_connection_settings.socket_address) 716 else: 717 self.set_of_gate_addresses.add(gate_connection_settings.socket_address) 718 self._log('WARNING: GATE: SAVE CONNECTION ADDRESS: UNKNOWN SOCKET FAMILY') 719 self._connection_settings_by_gate_conn[gate] = gate_connection_settings 720 self._we_have_connections_for_select = True 721 new_connection_settings.add(gate_connection_settings) 722 else: 723 self.faulty_connection_settings.add(gate_connection_settings) 724 self.gates_connections_settings = new_connection_settings 725 726 return len(self.gates_connections_settings)
739 def close(self): 740 for gate in self._gate: 741 del self.socket_by_fd[gate.fileno()] 742 gate.close() 743 744 if gate in self._input_check_sockets: 745 self._input_check_sockets.remove(gate) 746 if gate in self._exception_check_sockets: 747 self._exception_check_sockets.remove(gate) 748 749 if not self._input_check_sockets: 750 self._we_have_connections_for_select = False 751 self._unlink_good_af_unix_sockets()
758 def add_client(self, expected_client_info: Client): 759 """ 760 Добавляет новый expected client в список. Это может быть как клиент (который сам подключился или подключится в 761 будущем), так и супер-сервер, попытка подключения к которому будет осуществлена тут же - на месте. 762 При этом если произойдет какая-либо ошибка при подключении к супер-серверу - expected client не будет 763 зарегистрирован. Однако client может быть создан. В случае ошибки он будет помечен для закрытия и удаления. 764 Поэтому исключения нужно перехватывать, и после этого проводить как минимум один (как минимум завершающий - 765 перед закрытием и уничтожением сервера) цикл обработки io_iteration(). 766 :param expected_client_info: link to Client() 767 :return: expected_client_id 768 """ 769 if (expected_client_info.connection_settings.keyword is None) \ 770 and (ConnectionDirectionRole.client == expected_client_info.connection_settings.direction_role): 771 raise Exception('Keyword in Client.connection_settings should not be None for a Client connection!') 772 773 if expected_client_info.connection_settings.keyword in self._keywords_for_expected_clients: 774 raise Exception('Expected Client with keyword "{}" is already registered!'.format( 775 expected_client_info.connection_settings.keyword)) 776 777 expected_client_info.id = self._expected_clients_id_gen() 778 779 if self.unexpected_clients_are_allowed: 780 if expected_client_info.connection_settings.keyword in self._keywords_of_unexpected_clients: 781 # клиент уже подключен 782 unexpected_client_id = self._keywords_of_unexpected_clients[ 783 expected_client_info.connection_settings.keyword] 784 unexpected_client_info = self._unexpected_clients[unexpected_client_id] 785 connection_info = expected_client_info._Client__connection 786 if ( 787 unexpected_client_info.connection_settings.direction_role == expected_client_info.connection_settings.direction_role) and \ 788 (ConnectionDirectionRole.client == unexpected_client_info.connection_settings.direction_role): 789 # Произошел запрос на подключение к клиенту, и клиент с таким же ключевым словом уже 790 # подключен (с сервером этого быть не должно, и может произойти только при неверном ручном изменении 791 # внутренних данных объекта класса ASockIOCore). Необходимо переиспользовать уже имеющееся 792 # подключение. 793 # В случае если все же тут оказался соккет подключенный к супер-серверу - он будет автоматически 794 # отключен и соединение будет установлено в новом сокете. 795 expected_client_info.connection_id = unexpected_client_info.connection_id 796 expected_client_info._Client__connection = \ 797 unexpected_client_info._Client__connection 798 self._conns_of_expected_clients[connection_info.conn.result] = expected_client_info.id 799 connection_info.connected_expected_client_id = expected_client_info.id 800 connection_info.connected_expected_client = expected_client_info 801 self._io_iteration_result.newly_connected_expected_clients.add(expected_client_info.id) 802 else: 803 # Произошел запрос на подключение к супер-серверу, но клиент с таким же ключевым словом уже 804 # подключен (или наоборот). Или же просто имеется установленное соединение с супер-сервером. 805 # Необходимо его отключить. 806 self._mark_connection_to_be_closed_immediately(connection_info) 807 self._mark_connection_as_ready_for_deletion(connection_info) 808 self._remove_unexpected_client(unexpected_client_id) 809 else: 810 # клиент еще не подключен 811 expected_client_info.connection_id = None 812 expected_client_info._Client__connection = None 813 814 if ConnectionDirectionRole.server == expected_client_info.connection_settings.direction_role: 815 self._connect_to_super_server(expected_client_info) 816 817 self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword] = expected_client_info.id 818 self._expected_clients[expected_client_info.id] = expected_client_info 819 820 return expected_client_info.id
Добавляет новый expected client в список. Это может быть как клиент (который сам подключился или подключится в будущем), так и супер-сервер, попытка подключения к которому будет осуществлена тут же - на месте. При этом если произойдет какая-либо ошибка при подключении к супер-серверу - expected client не будет зарегистрирован. Однако client может быть создан. В случае ошибки он будет помечен для закрытия и удаления. Поэтому исключения нужно перехватывать, и после этого проводить как минимум один (как минимум завершающий - перед закрытием и уничтожением сервера) цикл обработки io_iteration(). :param expected_client_info: link to Client() :return: expected_client_id
822 def get_client_id_by_keyword(self, expected_client_keyword): 823 """ 824 :param expected_client_keyword: expected_client_keyword 825 :return: link to Client() 826 """ 827 return self._keywords_for_expected_clients[expected_client_keyword]
:param expected_client_keyword: expected_client_keyword :return: link to Client()
829 def get_client_info(self, expected_client_id): 830 """ 831 :param expected_client_id: expected_client_id 832 :return: link to Client() 833 """ 834 return self._expected_clients[expected_client_id]
:param expected_client_id: expected_client_id :return: link to Client()
836 def get_connection_input_fifo_size_for_client(self, expected_client_id): 837 expected_client_info = self._expected_clients[expected_client_id] 838 connection_info = expected_client_info._Client__connection 839 if connection_info is None: 840 raise Exception('Expected client was not connected yet!') 841 # if client_info.this_is_raw_connection: 842 # if client_info.input_from_client.size(): 843 # return 1 844 # else: 845 # return 0 846 # else: 847 # return client_info.input_from_client.size() 848 return connection_info.input_from_client.size()
850 def get_message_from_client(self, expected_client_id): 851 expected_client_info = self._expected_clients[expected_client_id] 852 connection_info = expected_client_info._Client__connection 853 if connection_info is None: 854 raise Exception('Expected client was not connected yet!') 855 if not connection_info.input_from_client.size(): 856 raise Exception('There is no readable data in expected client\'s FIFO!') 857 # if client_info.this_is_raw_connection: 858 # self._consolidate_raw_messages_in_input_from_client_fifo(client_info) 859 return connection_info.input_from_client.get()
862 def get_messages_from_client(self, expected_client_id): 863 expected_client_info = self._expected_clients[expected_client_id] 864 connection_info = expected_client_info._Client__connection 865 if connection_info is None: 866 raise Exception('Expected client was not connected yet!') 867 try: 868 while True: 869 yield connection_info.input_from_client.get() 870 except FIFOIsEmpty: 871 pass 872 # while client_info.input_from_client.size(): 873 # yield client_info.input_from_client.get()
875 def get_connection_output_fifo_size_for_client(self, expected_client_id): 876 expected_client_info = self._expected_clients[expected_client_id] 877 connection_info = expected_client_info._Client__connection 878 if connection_info is None: 879 raise Exception('Expected client was not connected yet!') 880 return connection_info.output_to_client.size()
882 def send_message_to_client(self, expected_client_id, data): 883 # data = bytes(data) 884 expected_client_info = self._expected_clients[expected_client_id] 885 connection_info = expected_client_info._Client__connection 886 if connection_info is None: 887 raise Exception('Expected client was not connected yet!') 888 if connection_info.this_is_raw_connection: 889 self._send_message_through_connection_raw(connection_info, data) 890 else: 891 self._send_message_through_connection(connection_info, data)
893 def send_messages_to_client(self, expected_client_id, messages_list): 894 # data = bytes(data) 895 expected_client_info = self._expected_clients[expected_client_id] 896 connection_info = expected_client_info._Client__connection 897 if connection_info is None: 898 raise Exception('Expected client was not connected yet!') 899 if connection_info.this_is_raw_connection: 900 self._send_messages_through_connection_raw(connection_info, messages_list) 901 else: 902 self._send_messages_through_connection(connection_info, messages_list)
904 def check_is_client_is_in_raw_connection_mode(self, expected_client_id): 905 expected_client_info = self._expected_clients[expected_client_id] 906 connection_info = expected_client_info._Client__connection 907 if connection_info is None: 908 raise Exception('Expected client was not connected yet!') 909 return connection_info.this_is_raw_connection
911 def switch_client_raw_connection_mode(self, expected_client_id, is_raw: bool): 912 expected_client_info = self._expected_clients[expected_client_id] 913 connection_info = expected_client_info._Client__connection 914 if connection_info is None: 915 raise Exception('Expected client was not connected yet!') 916 connection_info.this_is_raw_connection = is_raw
918 def set_inline_processor_for_client(self, expected_client_id, 919 class_for_unknown_clients_inline_processing: type): 920 expected_client_info = self._expected_clients[expected_client_id] 921 connection_info = expected_client_info._Client__connection 922 if connection_info is None: 923 raise Exception('Expected client was not connected yet!') 924 self._set_inline_processor_for_client(connection_info, expected_client_info, 925 class_for_unknown_clients_inline_processing)
927 def close_client_connection(self, expected_client_id, raise_if_already_closed=True): 928 """ 929 Connection will be closed immediately (inside this method) 930 :param expected_client_id: 931 :param raise_if_already_closed: 932 :return: 933 """ 934 if __debug__: self._log('CLOSE EXPECTED CLIENT SOCKET:') 935 expected_client_info = self._expected_clients[expected_client_id] 936 connection_info = expected_client_info._Client__connection 937 if raise_if_already_closed and (connection_info is None): 938 raise Exception('Expected client was not connected yet!') 939 self.close_connection(connection_info.id)
Connection will be closed immediately (inside this method) :param expected_client_id: :param raise_if_already_closed: :return:
941 def mark_client_connection_as_should_be_closed_immediately(self, expected_client_id, 942 raise_if_already_closed=True): 943 """ 944 Connection will be closed immediately (inside main IO loop) 945 :param expected_client_id: 946 :param raise_if_already_closed: 947 :return: 948 """ 949 if __debug__: self._log('MARK EXPECTED CLIENT SOCKET AS SHOULD BE CLOSED IMMEDIATELY:') 950 expected_client_info = self._expected_clients[expected_client_id] 951 connection_info = expected_client_info._Client__connection 952 if raise_if_already_closed and (connection_info is None): 953 raise Exception('Expected client was not connected yet!') 954 self._mark_connection_to_be_closed_immediately(connection_info)
Connection will be closed immediately (inside main IO loop) :param expected_client_id: :param raise_if_already_closed: :return:
956 def mark_client_connection_as_ready_to_be_closed(self, expected_client_id, raise_if_already_closed=True): 957 """ 958 Connection will be closed when all output will be sent (inside main IO loop). 959 :param expected_client_id: 960 :param raise_if_already_closed: 961 :return: 962 """ 963 if __debug__: self._log('MARK EXPECTED CLIENT SOCKET AS READY TO BE CLOSED:') 964 expected_client_info = self._expected_clients[expected_client_id] 965 connection_info = expected_client_info._Client__connection 966 if raise_if_already_closed and (connection_info is None): 967 raise Exception('Expected client was not connected yet!') 968 self._mark_connection_as_ready_to_be_closed(connection_info)
Connection will be closed when all output will be sent (inside main IO loop). :param expected_client_id: :param raise_if_already_closed: :return:
970 def remove_client(self, expected_client_id): 971 if __debug__: self._log('REMOVE EXPECTED CLIENT: {}'.format(expected_client_id)) 972 expected_client_info = self._expected_clients[expected_client_id] 973 if __debug__: self._log('\tWITH KEYWORD: {}'.format(expected_client_info.connection_settings.keyword)) 974 connection_id = expected_client_info.connection_id 975 if connection_id is None: 976 self._remove_client(expected_client_id) 977 self.remove_connection(connection_id)
979 def add_connection(self, conn, address): 980 """ 981 :param conn: socket 982 :param address: address 983 :return: client ID 984 """ 985 if conn is None: 986 raise TypeError('conn should not be None!') 987 988 self.socket_by_fd[conn.fileno()] = conn 989 conn.setblocking(0) 990 if self.use_nodelay_inet and (conn.family in INET_TYPE_CONNECTIONS): 991 conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 992 993 new_client_id = self._connections_id_gen() 994 995 client_info = Connection(new_client_id, (conn, address), self) 996 self._connections[new_client_id] = client_info 997 self._connection_by_conn[conn] = new_client_id 998 self._input_check_sockets.add(conn) 999 self._exception_check_sockets.add(conn) 1000 self._we_have_connections_for_select = True 1001 1002 self._unconfirmed_clients.add(conn) 1003 1004 return new_client_id
:param conn: socket :param address: address :return: client ID
1006 def check_connection_existance(self, connection_id): 1007 if connection_id not in self._expected_clients: 1008 return False 1009 if connection_id not in self._connections: 1010 return False 1011 client_info = self._connections[connection_id] 1012 if not client_info.conn.existence: 1013 return False 1014 conn = client_info.conn.result 1015 if conn is None: 1016 return False 1017 return True
1019 def close_connection(self, connection_id): 1020 if __debug__: self._log('CLOSE CLIENT {}:'.format(connection_id)) 1021 client_info = self._connections[connection_id] 1022 if not client_info.conn.existence: 1023 if __debug__: self._log('CLIENT {} CONN IS NOT SET.'.format(connection_id)) 1024 return 1025 conn = client_info.conn.result 1026 if conn is None: 1027 if __debug__: self._log('CLIENT {} CONN IS NONE.'.format(connection_id)) 1028 return 1029 1030 del self.socket_by_fd[conn.fileno()] 1031 conn.close() 1032 client_info.conn.existence = False 1033 client_info.output_to_client = copy.copy(client_info.output_to_client) # clear all output data to free some 1034 # memory even before destroying 1035 1036 if connection_id in self._connections_marked_as_ready_to_be_closed: 1037 self._connections_marked_as_ready_to_be_closed.remove(connection_id) 1038 if conn in self._connections_marked_to_be_closed_immediately: 1039 self._connections_marked_to_be_closed_immediately.remove(conn) 1040 if conn in self._connection_by_conn: 1041 del self._connection_by_conn[conn] 1042 if conn in self._input_check_sockets: 1043 self._input_check_sockets.remove(conn) 1044 if conn in self._output_check_sockets: 1045 self._output_check_sockets.remove(conn) 1046 if conn in self._exception_check_sockets: 1047 self._exception_check_sockets.remove(conn) 1048 1049 if not self._input_check_sockets: 1050 self._we_have_connections_for_select = False 1051 1052 if __debug__: self._log('CLIENT {} NORMALLY CLOSED.'.format(connection_id))
1054 def close_connection_by_conn(self, conn): 1055 # Если conn не в списке - вылетет ошибка. Это предотвратит ошибочное закрытие незарегистрированного сокета. 1056 # И мы сможем обнаружить наличие соответствующей ошибки в коде. 1057 if __debug__: self._log('CLOSE CLIENT BY CONN: {}'.format(repr(conn))) 1058 connection_id = self._connection_by_conn[conn] 1059 if __debug__: self._log('\t WITH CLIENT ID: {}'.format(connection_id)) 1060 self.close_connection(connection_id)
1062 def remove_connection(self, connection_id): 1063 # client should NOT be removed immediately after connection close (close_connection): 1064 # code should do it by itself after reading all available input data 1065 if __debug__: self._log('REMOVE CLIENT: {}'.format(connection_id)) 1066 client_info = self._connections[connection_id] 1067 if __debug__: self._log('\tWITH KEYWORD: {}'.format(client_info.keyword)) 1068 if client_info.conn.existence: 1069 self.close_connection(connection_id) 1070 conn = client_info.conn.result 1071 if conn is None: 1072 return 1073 1074 if conn in self._conns_of_unexpected_clients: 1075 self._remove_unexpected_client(self._conns_of_unexpected_clients[conn]) 1076 1077 # if conn in self._conns_of_expected_clients: 1078 # self._remove_client(self._conns_of_expected_clients[conn]) 1079 if client_info.connected_expected_client_id is not None: 1080 self._remove_client(client_info.connected_expected_client_id) 1081 1082 client_info.connected_expected_client_id = None 1083 client_info.connected_expected_client = None 1084 1085 if connection_id in self._connections_marked_as_ready_to_be_deleted: 1086 self._connections_marked_as_ready_to_be_deleted.remove(connection_id) 1087 1088 client_info.conn.existence = False 1089 client_info.conn.result = None 1090 1091 del self._connections[connection_id] 1092 if conn in self._connection_by_conn: 1093 del self._connection_by_conn[conn] 1094 if conn in self._input_check_sockets: 1095 self._input_check_sockets.remove(conn) 1096 if conn in self._output_check_sockets: 1097 self._output_check_sockets.remove(conn) 1098 if conn in self._exception_check_sockets: 1099 self._exception_check_sockets.remove(conn) 1100 1101 # client_info.remove()
Inherited Members
- cengal.io.asock_io.versions.v_0.base.IOCoreMemoryManagement
- global__data_size_limit
- global_in__data_size_limit
- global_in__data_full_size
- global_in__deletable_data_full_size
- global_out__data_size_limit
- global_out__data_full_size
- global_out__deletable_data_full_size
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
2066@contextmanager 2067def asock_io_core_connect(asock_io_core_obj: ASockIOCore, should_have_gate_connections: bool=False, backlog: int=1, 2068 should_have_all_desired_gate_connections: bool=False): 2069 try: 2070 desired_amount_of_gate_connections = len(asock_io_core_obj.gates_connections_settings) 2071 gate_connections_num = asock_io_core_obj.listen(backlog) 2072 if should_have_gate_connections and (not gate_connections_num): 2073 error_text = 'ERROR: CONTEXTMANAGER: BASIC INITIATION: THERE IS NO GOOD GATE CONNECTIONS!' 2074 asock_io_core_obj._log(error_text) 2075 raise ThereAreNoGateConections(error_text) 2076 else: 2077 if should_have_all_desired_gate_connections: 2078 if desired_amount_of_gate_connections != gate_connections_num: 2079 error_text = 'ERROR: CONTEXTMANAGER: BASIC INITIATION: NOT ENOUGH GOOD GATE CONNECTIONS!' 2080 asock_io_core_obj._log(error_text) 2081 raise NotEnoughGateConnections(error_text) 2082 print('THERE ARE CREATED {} GOOD GATE CONNECTIONS'.format(gate_connections_num)) 2083 yield asock_io_core_obj 2084 except: 2085 raise 2086 finally: 2087 asock_io_core_obj.io_iteration() 2088 asock_io_core_obj.destroy() 2089 asock_io_core_obj.io_iteration() 2090 if __debug__: print('RECV BUFF SIZES: {}'.format(str(asock_io_core_obj.recv_buff_sizes)[:150])) 2091 if __debug__: print('RECV SIZES: {}'.format(str(asock_io_core_obj.recv_sizes)[:150]))
2094class CheckIsRawConnection: 2095 def __call__(self, asock_io_core: ASockIOCore, connection_info: Connection)->bool: 2096 """ 2097 :param asock_io_core: 2098 :param connection_info: 2099 :return: "True" if it is RAW connection for Unknow Client. "False" otherwise. 2100 """ 2101 result = False 2102 try: 2103 if connection_info.conn.result.family in {socket.AF_INET, socket.AF_INET6}: 2104 if connection_info.addr.result[0] not in asock_io_core.set_of_gate_addresses: 2105 # If connected not from local IP address 2106 result = True 2107 except: 2108 pass 2109 return result