cengal.io.asock_io.versions.v_0.tcp_app_server

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

   1#!/usr/bin/env python
   2# coding=utf-8
   3
   4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
   5# 
   6# Licensed under the Apache License, Version 2.0 (the "License");
   7# you may not use this file except in compliance with the License.
   8# You may obtain a copy of the License at
   9# 
  10#     http://www.apache.org/licenses/LICENSE-2.0
  11# 
  12# Unless required by applicable law or agreed to in writing, software
  13# distributed under the License is distributed on an "AS IS" BASIS,
  14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15# See the License for the specific language governing permissions and
  16# limitations under the License.
  17
  18
  19"""
  20Module Docstring
  21Docstrings: http://www.python.org/dev/peps/pep-0257/
  22"""
  23
  24
  25__author__ = "ButenkoMS <gtalk@butenkoms.space>"
  26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
  27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
  28__license__ = "Apache License, Version 2.0"
  29__version__ = "4.4.1"
  30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
  31__email__ = "gtalk@butenkoms.space"
  32# __status__ = "Prototype"
  33__status__ = "Development"
  34# __status__ = "Production"
  35
  36
  37import os
  38import sys
  39import traceback
  40import select
  41import time
  42# from fileinput import input
  43from typing import Set, Iterable, Optional, Tuple, Dict
  44import copy
  45from collections import deque
  46
  47from cengal.data_generation.id_generator import IDGenerator, GeneratorType
  48from .base import *
  49
  50from cengal.data_containers.dynamic_list_of_pieces import DynamicListOfPiecesDequeWithLengthControl
  51from cengal.data_containers.fast_fifo import FIFODequeWithLengthControl, FIFOIsEmpty
  52from cengal.data_manipulation.front_triggerable_variable import FrontTriggerableVariable, FrontTriggerableVariableType
  53from cengal.code_flow_control.smart_values.versions.v_0 import ResultExistence
  54from contextlib import contextmanager
  55from cengal.code_inspection.line_profiling import set_profiler
  56from .recv_buff_size_computer import RecvBuffSizeComputer
  57from math import ceil
  58
  59"""
  60Module Docstring
  61Docstrings: http://www.python.org/dev/peps/pep-0257/
  62
  63CAUTION: some code here is optimized for speed - not for readability or beauty.
  64"""
  65
  66__author__ = 'ButenkoMS <gtalk@butenkoms.space>'
  67
  68# set_profiler(True)
  69set_profiler(False)
  70
  71
  72class IoIterationResult:
  73    """
  74    ([1] подключившиеся ожидаемые клиенты (ОКл); [2] ОКл сокет которых был отключен по причине ошибки
  75    (сами ОКл еще небыли удалены - удаление нужно инициировать явно); [3] ОКл имеет очередь непрочитанных принятых
  76    сообщений; [4] размер очереди неотправленных сообщений ОКл меньше порогового, а значит в нее можно записывать
  77    новые запросы (не уверен пока в надобности этого параметра. Скорее всего он не нужен: актор в большинстве
  78    случаев блокируется при вызове IO-операции; кроме случаев когда был задействован ассинхронный интерфейс,
  79    при котором актор отправляет запрос не требуя ответа об успешном окончании операции (без какого-либо контроля
  80    успешности, или же с ручным контролем путем вызова спец-метода, который-бы и проводил проверку, или же
  81    считывание имеющихся результатов операций)))
  82    """
  83
  84    def __init__(self):
  85        self.newly_connected_expected_clients = set()
  86        self.newly_connected_unknown_clients = set()
  87        self.clients_with_disconnected_connection = set()
  88        self.clients_have_data_to_read = set()
  89        self.clients_with_empty_output_fifo = set()
  90
  91    def update(self, other):
  92        self.newly_connected_expected_clients.update(other.newly_connected_expected_clients)
  93        self.newly_connected_unknown_clients.update(other.newly_connected_unknown_clients)
  94        self.clients_with_disconnected_connection.update(
  95            other.clients_with_disconnected_connection)
  96        self.clients_have_data_to_read.update(other.clients_have_data_to_read)
  97        self.clients_with_empty_output_fifo.update(other.clients_with_empty_output_fifo)
  98
  99    def remove(self, item):
 100        if item in self.newly_connected_expected_clients:
 101            self.newly_connected_expected_clients.remove(item)
 102        if item in self.newly_connected_unknown_clients:
 103            self.newly_connected_unknown_clients.remove(item)
 104        if item in self.clients_with_disconnected_connection:
 105            self.clients_with_disconnected_connection.remove(item)
 106        if item in self.clients_have_data_to_read:
 107            self.clients_have_data_to_read.remove(item)
 108        if item in self.clients_with_empty_output_fifo:
 109            self.clients_with_empty_output_fifo.remove(item)
 110
 111    def clear(self):
 112        self.newly_connected_expected_clients.clear()
 113        self.newly_connected_unknown_clients.clear()
 114        self.clients_with_disconnected_connection.clear()
 115        self.clients_have_data_to_read.clear()
 116        self.clients_with_empty_output_fifo.clear()
 117
 118
 119class ASockIOCoreMemoryManagement(IOCoreMemoryManagement):
 120    def __init__(self):
 121        super(ASockIOCoreMemoryManagement, self).__init__()
 122
 123        self.socket_read_fixed_buffer_size = ResultExistence(True,
 124                                                             1024 ** 2)  # 1024**2 is the fastest fixed read buffer.
 125
 126    def link_to(self, parent):
 127        super(ASockIOCoreMemoryManagement, self).link_to(parent)
 128        try:
 129            self.socket_read_fixed_buffer_size = parent.socket_read_fixed_buffer_size
 130        except AttributeError:
 131            pass
 132
 133
 134class Connection:
 135    def __init__(self,
 136                 connection_id=None,
 137                 connection__conn_addr: tuple = None,
 138                 global_memory_management: ASockIOCoreMemoryManagement = None
 139                 ):
 140        """
 141        
 142        :param connection_id: ID for this connection
 143        :param connection__conn_addr: tuple(conn, addr) where conn is a socket, addr is an address
 144        :param global_memory_management: global memory management obj
 145        """
 146        self.id = connection_id
 147        if connection__conn_addr is None:
 148            self.conn = ResultExistence(False, None)
 149            self.addr = ResultExistence(False, None)
 150        else:
 151            self.conn = ResultExistence(True, connection__conn_addr[0])
 152            self.addr = ResultExistence(True, connection__conn_addr[1])
 153
 154        self.addr_info = None
 155        self.host_names = None
 156
 157        self.recv_buff_size_computer = RecvBuffSizeComputer()
 158        self.recv_buff_size = 0
 159        self.calc_new_recv_buff_size(0)
 160
 161        self.should_be_closed = False  # socket should be closed immediately. For example because of IO error.
 162        self.ready_to_be_closed = False  # socket should be closed, after all messages had been sent to client.
 163        self.ready_for_deletion = False  # connection should be deleted immediately. For example because of unexpected
 164        #   keyword.
 165
 166        self.keyword = None
 167
 168        self.raw_input_from_client = DynamicListOfPiecesDequeWithLengthControl(
 169            external_data_length=global_memory_management.global_in__data_full_size)
 170        self.current_message_length = None  # length of current input message (or None, if size waw not read yet)
 171        self.input_from_client = FIFODequeWithLengthControl(
 172            external_data_full_size=global_memory_management.global_in__data_full_size)
 173        self.current_memoryview_output = None
 174        self.current_memoryview_input = None
 175        self.output_to_client = FIFODequeWithLengthControl(
 176            external_data_full_size=global_memory_management.global_out__data_full_size)
 177
 178        self.this_is_raw_connection = False
 179
 180        self.connected_expected_client_id = None
 181        self.connected_expected_client = None
 182        self.has_inline_processor = False
 183
 184    def calc_new_recv_buff_size(self, last_recv_amount):
 185        self.recv_buff_size = self.recv_buff_size_computer.calc_new_recv_buff_size(last_recv_amount)
 186
 187    def remove(self):
 188        self.raw_input_from_client.remove()
 189        self.input_from_client.remove()
 190        self.output_to_client.remove()
 191
 192
 193class InlineProcessor:
 194    __slots__ = ('client_id', 'keyword', 'socket_family', 'socket_type', 'socket_proto', 'addr_info', 'host_names',
 195                 'is_in_raw_mode', '__set__is_in_raw_mode', '__set__mark_socket_as_should_be_closed_immediately',
 196                 '__set__mark_socket_as_ready_to_be_closed', '__external_parameters_set_trigger', 'output_messages',
 197                 '__hold__client_id')
 198
 199    def __init__(self, client_id=None, keyword: bytes = None, socket_family=None, socket_type=None, socket_proto=None,
 200                 addr_info=None, host_names=None, external_parameters_set_trigger: Set = None):
 201        """
 202
 203        :param keyword: client keyword. You may check for a known keywords to act appropriately
 204        :param socket_family:
 205        :param socket_type:
 206        :param socket_proto:
 207        :param addr_info: result of socket.getaddrinfo() call
 208        :param host_names: result of socket.gethostbyaddr() call
 209        """
 210        self.client_id = client_id
 211        self.keyword = keyword
 212        self.socket_family = socket_family
 213        self.socket_type = socket_type
 214        self.socket_proto = socket_proto
 215        self.addr_info = addr_info
 216        self.host_names = host_names
 217        self.is_in_raw_mode = None
 218        self.__hold__client_id = client_id
 219        self.__set__is_in_raw_mode = False
 220        self.__set__mark_socket_as_should_be_closed_immediately = False
 221        self.__set__mark_socket_as_ready_to_be_closed = False
 222        self.__external_parameters_set_trigger = external_parameters_set_trigger
 223
 224        # self.output_messages = FIFODequeWithLengthControl()
 225        self.output_messages = deque()
 226        # self.output_messages = list()
 227
 228    def on__data_received(self, data: bytes):
 229        """
 230        Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data
 231        Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will 
 232        be logged
 233        :param data: piece of input data if connection is in RAW-mode and full message otherwise.
 234        """
 235        pass
 236
 237    def on__output_buffers_are_empty(self):
 238        """
 239        Will be called immediately when all output data was send.
 240        Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data
 241        Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will 
 242        be logged
 243        """
 244        pass
 245
 246    def on__connection_lost(self):
 247        """
 248        Will be called after connection was closed. Current Inline Processor object will be destroyed after this call.
 249        Situation with unhandled exception will be logged.
 250        """
 251        pass
 252
 253    def set__is_in_raw_mode(self, is_in_raw_mode: bool):
 254        self.__set__is_in_raw_mode = is_in_raw_mode
 255        self.__external_parameters_set_trigger.add(self.__hold__client_id)
 256
 257    def mark__socket_as_should_be_closed_immediately(self, mark_socket_as: bool):
 258        self.__set__mark_socket_as_should_be_closed_immediately = mark_socket_as
 259        self.__external_parameters_set_trigger.add(self.__hold__client_id)
 260
 261    def mark__socket_as_ready_to_be_closed(self, mark_socket_as: bool):
 262        self.__set__mark_socket_as_ready_to_be_closed = mark_socket_as
 263        self.__external_parameters_set_trigger.add(self.__hold__client_id)
 264
 265
 266class Client:
 267    def __init__(self, connection_settings: ConnectionSettings, client_id=None, client_tcp_id=None):
 268        """
 269
 270        :param client_id: ID of the expected client
 271        :param client_tcp_id: ID of the connection
 272        :param connection_settings: useful ConnectionSettings parameters are {direction_role, keyword} - for a client,
 273            and all - for the super server.
 274        """
 275        self.id = client_id
 276        self.connection_id = client_tcp_id
 277        self.__connection = None  # type: Optional[Connection]
 278        self.connection_settings = connection_settings
 279        self.connection_settings.check()
 280
 281        self.will_use_raw_client_connection = False
 282        self.will_use_raw_connection_without_handshake = False
 283        self.this_is_unknown_client = False
 284
 285        self.obj_for_inline_processing = None
 286
 287    def get_connection(self)->Connection:
 288        return self.__connection
 289
 290
 291class ASockIOCore(ASockIOCoreMemoryManagement):
 292    def __init__(self, gates_connections_settings: Set[ConnectionSettings]):
 293        """
 294        Port should not be open to a external world!
 295        :param gates_connections_settings: set() of ConnectionSettings()
 296        :return:
 297        """
 298        super(ASockIOCore, self).__init__()
 299
 300        if os.name != 'nt':
 301            self.po = select.poll()
 302        self.last_all_sockets = set()  # type: Set[int]
 303        self.socket_by_fd = dict()  # type: Dict[int, socket.socket]
 304
 305        self.check_sockets_sum_time = 0.0
 306        self.check_sockets_qnt = 0
 307        self.check_sockets_max_time = 0.0
 308
 309        self.gates_connections_settings = gates_connections_settings
 310        if not self.gates_connections_settings:
 311            self.gates_connections_settings = set()
 312            # raise Exception('gates_connections_settings should be provided!')
 313        for gates_connections_settings in self.gates_connections_settings:
 314            gates_connections_settings.check()
 315        self.faulty_connection_settings = set()
 316        self._connection_settings_by_gate_conn = dict()
 317
 318        self.set_of_gate_addresses = set()
 319        self._gate = set()
 320        self.reuse_gate_addr = False
 321        self.reuse_gate_port = False
 322
 323        self.message_size_len = MESSAGE_SIZE_LEN
 324        self.server_answer__keyword_accepted = SERVER_ANSWER__KEYWORD_ACCEPTED
 325
 326        self._connections = dict()  # key: ID; data: Connection()
 327        self._connection_by_conn = dict()  # key: conn; data: ID
 328        self._connections_id_gen = IDGenerator()
 329
 330        self._connections_marked_as_ready_to_be_closed = set()
 331        self._connections_marked_to_be_closed_immediately = set()
 332        self._connections_marked_as_ready_to_be_deleted = set()
 333
 334        self._unconfirmed_clients = set()
 335
 336        self._we_have_connections_for_select = False
 337        self._input_check_sockets = set()
 338        self._output_check_sockets = set()
 339        self._exception_check_sockets = set()
 340
 341        # ID (GUID) и другая информация клиентов, подключение которых явно ожидается
 342        self._expected_clients = dict()  # key: ID; data: Client()
 343        self._expected_clients_id_gen = IDGenerator()
 344        self._keywords_for_expected_clients = dict()  # key: keyword; data: info
 345        self._conns_of_expected_clients = dict()  # key: conn; data expected_client_ID
 346
 347        self.unexpected_clients_are_allowed = True
 348
 349        # Список неопознанных и неожидаемых клиентов. Если клиент выдал свой GUID и позже кто-то добавил этот GUID в
 350        # список ожидаемых клиентов - данный клиент будет автоматически подхвачен.
 351        self._unexpected_clients = dict()  # key: ID; data: Client()
 352        self._unexpected_clients_id_gen = IDGenerator()
 353        self._keywords_of_unexpected_clients = dict()
 354        self._conns_of_unexpected_clients = dict()
 355
 356        self._io_iteration_result = IoIterationResult()
 357
 358        self.raw_checker_for_new_incoming_connections = CheckIsRawConnection()
 359        self.need_to_auto_check_incoming_raw_connection = False
 360        self.unknown_clients_are_allowed = False
 361        self._unknown_clients_keyword_gen = IDGenerator(GeneratorType.guid_string)
 362        self.prefix_for_unknown_client_keywords = b'UNKNOWN CLIENT: '
 363
 364        self.echo_log = False
 365        self._internal_log = deque()
 366
 367        self.recv_sizes = deque()
 368        self.recv_buff_sizes = deque()
 369
 370        self.should_get_client_addr_info_on_connection = True
 371
 372        self.use_nodelay_inet = False
 373        self.use_speed_optimized_socket_read = False
 374
 375        self.show_inform_about_accept_stop_because_of_all_buffers_size_limit = \
 376            FrontTriggerableVariable(FrontTriggerableVariableType.equal, True)
 377        self.show_inform_about_read_stop_because_of_in_buffer_size_limit = \
 378            FrontTriggerableVariable(FrontTriggerableVariableType.equal, True)
 379        self.show_inform_about_work_stop_because_of_out_buffer_size_limit = \
 380            FrontTriggerableVariable(FrontTriggerableVariableType.equal, True)
 381
 382        self.class_for_unknown_clients_inline_processing = None
 383
 384        self._clients_with_inline_processors_that_need_to_apply_parameters = set()
 385
 386    @staticmethod
 387    def check_sockets_select(read: Set[int], write: Set[int], error: Set[int],
 388                             timeout: float)->Tuple[Set[int], Set[int], Set[int]]:
 389        all_sockets = read | write | error
 390        if all_sockets:
 391            return select.select(read,
 392                                 write,
 393                                 error,
 394                                 timeout)
 395        else:
 396            return set(), set(), set()
 397
 398    def check_sockets_poll(self, read: Set[int], write: Set[int], error: Set[int],
 399                           timeout: float)->Tuple[Set[int], Set[int], Set[int]]:
 400        read_events = select.POLLIN | select.POLLPRI
 401        write_events = select.POLLOUT
 402        except_events = select.POLLERR | select.POLLHUP | select.POLLNVAL
 403        if hasattr(select, 'POLLRDHUP'):
 404            except_events |= select.POLLRDHUP
 405        readable_events = {select.POLLIN, select.POLLPRI}
 406        writable_events = {select.POLLOUT}
 407        exceptional_events = {select.POLLERR, select.POLLHUP, select.POLLNVAL}
 408        if hasattr(select, 'POLLRDHUP'):
 409            exceptional_events.add(select.POLLRDHUP)
 410        all_events_set = readable_events | writable_events | exceptional_events
 411
 412        timeout = int(timeout * 1000)
 413
 414        # print('>>> POLL {}: last_all_sockets: {}'.format(time.perf_counter(), self.last_all_sockets))
 415        all_sockets = read | write | error
 416        # print('>>> POLL {}: all_sockets: {}'.format(time.perf_counter(), all_sockets))
 417        new_sockets = all_sockets - self.last_all_sockets
 418        # print('>>> POLL {}: new_sockets: {}'.format(time.perf_counter(), new_sockets))
 419        still_sockets = all_sockets & self.last_all_sockets
 420        # print('>>> POLL {}: still_sockets: {}'.format(time.perf_counter(), still_sockets))
 421        deleted_sockets = self.last_all_sockets - all_sockets
 422        # print('>>> POLL {}: deleted_sockets: {}'.format(time.perf_counter(), deleted_sockets))
 423        self.last_all_sockets = all_sockets
 424
 425        for socket_fd in new_sockets:
 426            event_mask = 0
 427            if socket_fd in read:
 428                event_mask |= read_events
 429            if socket_fd in write:
 430                event_mask |= write_events
 431            if socket_fd in error:
 432                event_mask |= except_events
 433            # print('>>> POLL {}: new_socket: {}; event_mask: {}'.format(time.perf_counter(), socket_fd, event_mask))
 434            self.po.register(socket_fd, event_mask)
 435
 436        for socket_fd in still_sockets:
 437            event_mask = 0
 438            if socket_fd in read:
 439                event_mask |= read_events
 440            if socket_fd in write:
 441                event_mask |= write_events
 442            if socket_fd in error:
 443                event_mask |= except_events
 444            # print('>>> POLL {}: still_socket: {}; event_mask: {}'.format(time.perf_counter(), socket_fd, event_mask))
 445            self.po.modify(socket_fd, event_mask)
 446
 447        for socket_fd in deleted_sockets:
 448            # print('>>> POLL {}: deleted_socket: {}'.format(time.perf_counter(), socket_fd))
 449            self.po.unregister(socket_fd)
 450
 451        poll_result = self.po.poll(timeout)
 452        # print('>>> POLL {}: result: {}'.format(time.perf_counter(), poll_result))
 453        # sys.stdout.flush()
 454
 455        readable = set()
 456        writable = set()
 457        exceptional = set()
 458        for socket_fd, event_mask in poll_result:
 459            socket_events_set = set()
 460            for another_event in all_events_set:
 461                if event_mask & another_event:
 462                    socket_events_set.add(another_event)
 463
 464            if socket_events_set & readable_events:
 465                readable.add(socket_fd)
 466            if socket_events_set & writable_events:
 467                writable.add(socket_fd)
 468            if socket_events_set & exceptional_events:
 469                exceptional.add(socket_fd)
 470
 471        return readable, writable, exceptional
 472
 473    def check_sockets(self, read: Set[socket.socket], write: Set[socket.socket], error: Set[socket.socket],
 474                      timeout: float)->Tuple[Set[socket.socket], Set[socket.socket], Set[socket.socket]]:
 475        all_sockets = read | write | error
 476        if all_sockets:
 477            read_fd = set()
 478            write_fd = set()
 479            error_fd = set()
 480            for conn in read:
 481                read_fd.add(conn.fileno())
 482            for conn in write:
 483                write_fd.add(conn.fileno())
 484            for conn in error:
 485                error_fd.add(conn.fileno())
 486
 487            check_sockets = self.check_sockets_select
 488            if os.name != 'nt':
 489                check_sockets = self.check_sockets_poll
 490
 491            readable_fd, writable_fd, exceptional_fd = check_sockets(read_fd,
 492                                                                     write_fd,
 493                                                                     error_fd,
 494                                                                     timeout)
 495            readable = set()
 496            writable = set()
 497            exceptional = set()
 498            for fd in readable_fd:
 499                readable.add(self.socket_by_fd[fd])
 500            for fd in writable_fd:
 501                writable.add(self.socket_by_fd[fd])
 502            for fd in exceptional_fd:
 503                exceptional.add(self.socket_by_fd[fd])
 504            return readable, writable, exceptional
 505        else:
 506            return set(), set(), set()
 507
 508    def gate_io_iteration(self, timeout=0.0):
 509        result = self._io_iteration_result
 510        if self._gate:
 511            readable, writable, exceptional = self.check_sockets_select(self._gate,
 512                                                                        set(),
 513                                                                        set(),
 514                                                                        timeout)
 515
 516            # Handle inputs
 517            for s in readable:
 518                self._read_data_from_socket(s)
 519
 520        self._io_iteration_result = IoIterationResult()
 521        return result
 522
 523    # @profile
 524    def io_iteration(self, timeout=0.0):
 525        """
 526
 527        :param timeout: timeout in seconds
 528        :return:
 529        """
 530        result = self._io_iteration_result
 531
 532        if self._we_have_connections_for_select:
 533            # need_to_process = False
 534            # all_sockets = self._input_check_sockets | self._output_check_sockets | self._exception_check_sockets
 535            # if not (all_sockets - self._gate):
 536            #     timeout = 0.01
 537
 538            need_to_repeat = True
 539
 540            while need_to_repeat:
 541                output_check_sockets = set()
 542
 543                # Is need to check writable sockets
 544                need_to_check_writable_sockets = False
 545                for s in self._output_check_sockets:
 546                    curr_client_info = self._connections[self._connection_by_conn[s]]
 547                    if curr_client_info.output_to_client.size():
 548                        need_to_check_writable_sockets = True
 549                        break
 550
 551                if need_to_check_writable_sockets:
 552                    output_check_sockets = self._output_check_sockets
 553
 554                # print('>>> POLL {}: ri: {}, wi: {}, ei: {}'.format(time.perf_counter(),
 555                #                                                    len(self._input_check_sockets),
 556                #                                                    len(self._output_check_sockets),
 557                #                                                    len(self._exception_check_sockets)))
 558                # sys.stdout.flush()
 559                check_sockets_start_time = time.perf_counter()
 560                readable, writable, exceptional = self.check_sockets(self._input_check_sockets,
 561                                                                     output_check_sockets,
 562                                                                     self._exception_check_sockets,
 563                                                                     timeout)
 564                check_sockets_finish_time = time.perf_counter()
 565                check_sockets_delta_time = check_sockets_finish_time - check_sockets_start_time
 566                self.check_sockets_sum_time += check_sockets_delta_time
 567                self.check_sockets_qnt += 1
 568                if self.check_sockets_max_time < check_sockets_delta_time:
 569                    self.check_sockets_max_time = check_sockets_delta_time
 570                check_socket_average_time = self.check_sockets_sum_time / self.check_sockets_qnt
 571                # print('>>> CHECK SOCKET: DELTA {}: AVG: {}; SUM: {}; MAX: {}'.format(
 572                #     check_sockets_delta_time,
 573                #     check_socket_average_time,
 574                #     self.check_sockets_sum_time,
 575                #     self.check_sockets_max_time
 576                # ))
 577                # print('>>> POLL {}: ro: {}, wo: {}, eo: {}'.format(time.perf_counter(),
 578                #                                                    len(readable),
 579                #                                                    len(writable),
 580                #                                                    len(exceptional)))
 581                # sys.stdout.flush()
 582
 583                read_is_forbidden = True
 584                if (self.global_in__data_full_size.result - self.global_in__deletable_data_full_size.result) \
 585                        <= self.global_in__data_size_limit.result:
 586                    read_is_forbidden = False
 587
 588                    # Handle inputs
 589                    for s in readable:
 590                        read_result = self._read_data_from_socket(s)
 591                        if read_result:
 592                            if s in self._unconfirmed_clients:
 593                                self._process_client_keyword(s)
 594                                self._check_is_client_have_data_to_read_in_fifo(s)
 595                            else:
 596                                self._client_have_data_to_read_in_fifo(s)
 597
 598                if __debug__:
 599                    read_is_forbidden_test = self.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger(
 600                        read_is_forbidden)
 601                    if read_is_forbidden_test is not None:
 602                        if read_is_forbidden_test:
 603                            print('Read is suppressed until data will be processed.')
 604                        else:
 605                            print('Read is allowed: data is processed.')
 606
 607                # Handle outputs
 608                for s in writable:
 609                    curr_client_info = self._connections[self._connection_by_conn[s]]
 610                    self._write_data_to_socket(curr_client_info)
 611                    # self._write_data_to_socket(s)
 612
 613                # Handle "exceptional conditions"
 614                for s in exceptional:
 615                    self._handle_connection_error(s)
 616
 617                # Set parameters for inline processors
 618                if self._clients_with_inline_processors_that_need_to_apply_parameters:
 619                    for ec_id in self._clients_with_inline_processors_that_need_to_apply_parameters:
 620                        expected_client_info = self._expected_clients[ec_id]
 621                        connection_info = expected_client_info._Client__connection
 622                        self._inline_processor__apply_parameters(connection_info, expected_client_info)
 623                    self._clients_with_inline_processors_that_need_to_apply_parameters.clear()
 624
 625                # Close sockets
 626                if self._connections_marked_to_be_closed_immediately:
 627                    sockets_should_be_closed_immediately = self._connections_marked_to_be_closed_immediately
 628                    self._connections_marked_to_be_closed_immediately = set()
 629                    for closeable_socket in sockets_should_be_closed_immediately:
 630                        connection_id = self._connection_by_conn[closeable_socket]
 631                        # self.close_connection_by_conn(closeable_socket)
 632                        self.close_connection(connection_id)
 633                        self._inline_processor__on__connection_lost_by_connection_id(connection_id)
 634
 635                # Removing clients
 636                if self._connections_marked_as_ready_to_be_deleted:
 637                    clients_ready_to_be_deleted = self._connections_marked_as_ready_to_be_deleted
 638                    self._connections_marked_as_ready_to_be_deleted = set()
 639                    for faulty_socket in clients_ready_to_be_deleted:
 640                        self.remove_connection_by_conn(faulty_socket)
 641
 642                if (self.global_out__data_full_size.result - self.global_out__deletable_data_full_size.result) \
 643                        <= self.global_out__data_size_limit.result:
 644                    need_to_repeat = False
 645                else:
 646                    need_to_repeat = True
 647
 648                need_to_repeat = False
 649
 650                if __debug__:
 651                    need_to_repeat_show = self.show_inform_about_work_stop_because_of_out_buffer_size_limit.test_trigger(
 652                        need_to_repeat)
 653                    if need_to_repeat_show is not None:
 654                        if need_to_repeat_show:
 655                            print('Work is suppressed until data will be out.')
 656                        else:
 657                            print('Work is allowed: data is out.')
 658
 659        self._io_iteration_result = IoIterationResult()
 660        return result
 661
 662    def listen(self, backlog=1):
 663        # backlog = backlog or 1
 664
 665        new_connection_settings = set()
 666        for gate_connection_settings in self.gates_connections_settings:
 667            gate = None
 668            try:
 669                gate = socket.socket(gate_connection_settings.socket_family, gate_connection_settings.socket_type,
 670                                     gate_connection_settings.socket_protocol, gate_connection_settings.socket_fileno)
 671                self.socket_by_fd[gate.fileno()] = gate
 672                gate.setblocking(0)
 673                # gate.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 674            except (socket.error, OSError) as err:
 675                gate = None
 676                if __debug__: self._log('EXCEPTION: GATE: LISTEN: CREATE SOCKET: {}, {}'.format(
 677                    err.errno, err.strerror))
 678                continue
 679
 680            if self.reuse_gate_port:
 681                gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
 682
 683            try:
 684                self._check_for_initial_af_unix_socket_unlink(gate_connection_settings)
 685                gate.bind(gate_connection_settings.socket_address)
 686            except (socket.error, OSError) as err:
 687                del self.socket_by_fd[gate.fileno()]
 688                gate.close()
 689                gate = None
 690                if __debug__: self._log('EXCEPTION: GATE: BIND:"{}", {}, {}'.format(
 691                    gate_connection_settings.socket_address, err.errno, err.strerror))
 692                continue
 693            try:
 694                gate.listen(backlog)
 695            except (socket.error, OSError) as err:
 696                del self.socket_by_fd[gate.fileno()]
 697                gate.close()
 698                gate = None
 699                if __debug__: self._log('EXCEPTION: GATE: LISTEN:"{}", {}, {}'.format(
 700                    gate_connection_settings.socket_address, err.errno, err.strerror))
 701                continue
 702
 703            if self.reuse_gate_addr:
 704                gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 705
 706            self._input_check_sockets.add(gate)
 707            self._exception_check_sockets.add(gate)
 708
 709            if gate:
 710                self._gate.add(gate)
 711                if gate_connection_settings.socket_family in INET_TYPE_CONNECTIONS:
 712                    self.set_of_gate_addresses.add(gate_connection_settings.socket_address[0])
 713                elif socket.AF_UNIX == gate_connection_settings.socket_family:
 714                    self.set_of_gate_addresses.add(gate_connection_settings.socket_address)
 715                else:
 716                    self.set_of_gate_addresses.add(gate_connection_settings.socket_address)
 717                    self._log('WARNING: GATE: SAVE CONNECTION ADDRESS: UNKNOWN SOCKET FAMILY')
 718                self._connection_settings_by_gate_conn[gate] = gate_connection_settings
 719                self._we_have_connections_for_select = True
 720                new_connection_settings.add(gate_connection_settings)
 721            else:
 722                self.faulty_connection_settings.add(gate_connection_settings)
 723        self.gates_connections_settings = new_connection_settings
 724
 725        return len(self.gates_connections_settings)
 726
 727    def close_all_connections(self):
 728        if __debug__: self._log('CLOSE ALL CONNECTIONS:')
 729        clients_list = dict(self._connections)
 730        for connection_id, client_info in clients_list.items():
 731            self.close_connection(connection_id)
 732
 733    def remove_all_connections(self):
 734        clients_list = dict(self._connections)
 735        for connection_id, client_info in clients_list.items():
 736            self.remove_connection(connection_id)
 737
 738    def close(self):
 739        for gate in self._gate:
 740            del self.socket_by_fd[gate.fileno()]
 741            gate.close()
 742
 743            if gate in self._input_check_sockets:
 744                self._input_check_sockets.remove(gate)
 745            if gate in self._exception_check_sockets:
 746                self._exception_check_sockets.remove(gate)
 747
 748            if not self._input_check_sockets:
 749                self._we_have_connections_for_select = False
 750        self._unlink_good_af_unix_sockets()
 751
 752    def destroy(self):
 753        self.close()
 754        self.close_all_connections()
 755        self.remove_all_connections()
 756
 757    def add_client(self, expected_client_info: Client):
 758        """
 759        Добавляет новый expected client в список. Это может быть как клиент (который сам подключился или подключится в
 760        будущем), так и супер-сервер, попытка подключения к которому будет осуществлена тут же - на месте.
 761        При этом если произойдет какая-либо ошибка при подключении к супер-серверу - expected client не будет
 762        зарегистрирован. Однако client может быть создан. В случае ошибки он будет помечен для закрытия и удаления.
 763        Поэтому исключения нужно перехватывать, и после этого проводить как минимум один (как минимум завершающий -
 764        перед закрытием и уничтожением сервера) цикл обработки io_iteration().
 765        :param expected_client_info: link to Client()
 766        :return: expected_client_id
 767        """
 768        if (expected_client_info.connection_settings.keyword is None) \
 769                and (ConnectionDirectionRole.client == expected_client_info.connection_settings.direction_role):
 770            raise Exception('Keyword in Client.connection_settings should not be None for a Client connection!')
 771
 772        if expected_client_info.connection_settings.keyword in self._keywords_for_expected_clients:
 773            raise Exception('Expected Client with keyword "{}" is already registered!'.format(
 774                expected_client_info.connection_settings.keyword))
 775
 776        expected_client_info.id = self._expected_clients_id_gen()
 777
 778        if self.unexpected_clients_are_allowed:
 779            if expected_client_info.connection_settings.keyword in self._keywords_of_unexpected_clients:
 780                # клиент уже подключен
 781                unexpected_client_id = self._keywords_of_unexpected_clients[
 782                    expected_client_info.connection_settings.keyword]
 783                unexpected_client_info = self._unexpected_clients[unexpected_client_id]
 784                connection_info = expected_client_info._Client__connection
 785                if (
 786                    unexpected_client_info.connection_settings.direction_role == expected_client_info.connection_settings.direction_role) and \
 787                        (ConnectionDirectionRole.client == unexpected_client_info.connection_settings.direction_role):
 788                    # Произошел запрос на подключение к клиенту, и клиент с таким же ключевым словом уже
 789                    # подключен (с сервером этого быть не должно, и может произойти только при неверном ручном изменении
 790                    # внутренних данных объекта класса ASockIOCore). Необходимо переиспользовать уже имеющееся
 791                    # подключение.
 792                    # В случае если все же тут оказался соккет подключенный к супер-серверу - он будет автоматически
 793                    # отключен и соединение будет установлено в новом сокете.
 794                    expected_client_info.connection_id = unexpected_client_info.connection_id
 795                    expected_client_info._Client__connection = \
 796                        unexpected_client_info._Client__connection
 797                    self._conns_of_expected_clients[connection_info.conn.result] = expected_client_info.id
 798                    connection_info.connected_expected_client_id = expected_client_info.id
 799                    connection_info.connected_expected_client = expected_client_info
 800                    self._io_iteration_result.newly_connected_expected_clients.add(expected_client_info.id)
 801                else:
 802                    # Произошел запрос на подключение к супер-серверу, но клиент с таким же ключевым словом уже
 803                    # подключен (или наоборот). Или же просто имеется установленное соединение с супер-сервером.
 804                    # Необходимо его отключить.
 805                    self._mark_connection_to_be_closed_immediately(connection_info)
 806                    self._mark_connection_as_ready_for_deletion(connection_info)
 807                self._remove_unexpected_client(unexpected_client_id)
 808            else:
 809                # клиент еще не подключен
 810                expected_client_info.connection_id = None
 811                expected_client_info._Client__connection = None
 812
 813        if ConnectionDirectionRole.server == expected_client_info.connection_settings.direction_role:
 814            self._connect_to_super_server(expected_client_info)
 815
 816        self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword] = expected_client_info.id
 817        self._expected_clients[expected_client_info.id] = expected_client_info
 818
 819        return expected_client_info.id
 820
 821    def get_client_id_by_keyword(self, expected_client_keyword):
 822        """
 823        :param expected_client_keyword: expected_client_keyword
 824        :return: link to Client()
 825        """
 826        return self._keywords_for_expected_clients[expected_client_keyword]
 827
 828    def get_client_info(self, expected_client_id):
 829        """
 830        :param expected_client_id: expected_client_id
 831        :return: link to Client()
 832        """
 833        return self._expected_clients[expected_client_id]
 834
 835    def get_connection_input_fifo_size_for_client(self, expected_client_id):
 836        expected_client_info = self._expected_clients[expected_client_id]
 837        connection_info = expected_client_info._Client__connection
 838        if connection_info is None:
 839            raise Exception('Expected client was not connected yet!')
 840        # if client_info.this_is_raw_connection:
 841        #     if client_info.input_from_client.size():
 842        #         return 1
 843        #     else:
 844        #         return 0
 845        # else:
 846        #     return client_info.input_from_client.size()
 847        return connection_info.input_from_client.size()
 848
 849    def get_message_from_client(self, expected_client_id):
 850        expected_client_info = self._expected_clients[expected_client_id]
 851        connection_info = expected_client_info._Client__connection
 852        if connection_info is None:
 853            raise Exception('Expected client was not connected yet!')
 854        if not connection_info.input_from_client.size():
 855            raise Exception('There is no readable data in expected client\'s FIFO!')
 856        # if client_info.this_is_raw_connection:
 857        #     self._consolidate_raw_messages_in_input_from_client_fifo(client_info)
 858        return connection_info.input_from_client.get()
 859
 860    # @profile
 861    def get_messages_from_client(self, expected_client_id):
 862        expected_client_info = self._expected_clients[expected_client_id]
 863        connection_info = expected_client_info._Client__connection
 864        if connection_info is None:
 865            raise Exception('Expected client was not connected yet!')
 866        try:
 867            while True:
 868                yield connection_info.input_from_client.get()
 869        except FIFOIsEmpty:
 870            pass
 871            # while client_info.input_from_client.size():
 872            #     yield client_info.input_from_client.get()
 873
 874    def get_connection_output_fifo_size_for_client(self, expected_client_id):
 875        expected_client_info = self._expected_clients[expected_client_id]
 876        connection_info = expected_client_info._Client__connection
 877        if connection_info is None:
 878            raise Exception('Expected client was not connected yet!')
 879        return connection_info.output_to_client.size()
 880
 881    def send_message_to_client(self, expected_client_id, data):
 882        # data = bytes(data)
 883        expected_client_info = self._expected_clients[expected_client_id]
 884        connection_info = expected_client_info._Client__connection
 885        if connection_info is None:
 886            raise Exception('Expected client was not connected yet!')
 887        if connection_info.this_is_raw_connection:
 888            self._send_message_through_connection_raw(connection_info, data)
 889        else:
 890            self._send_message_through_connection(connection_info, data)
 891
 892    def send_messages_to_client(self, expected_client_id, messages_list):
 893        # data = bytes(data)
 894        expected_client_info = self._expected_clients[expected_client_id]
 895        connection_info = expected_client_info._Client__connection
 896        if connection_info is None:
 897            raise Exception('Expected client was not connected yet!')
 898        if connection_info.this_is_raw_connection:
 899            self._send_messages_through_connection_raw(connection_info, messages_list)
 900        else:
 901            self._send_messages_through_connection(connection_info, messages_list)
 902
 903    def check_is_client_is_in_raw_connection_mode(self, expected_client_id):
 904        expected_client_info = self._expected_clients[expected_client_id]
 905        connection_info = expected_client_info._Client__connection
 906        if connection_info is None:
 907            raise Exception('Expected client was not connected yet!')
 908        return connection_info.this_is_raw_connection
 909
 910    def switch_client_raw_connection_mode(self, expected_client_id, is_raw: bool):
 911        expected_client_info = self._expected_clients[expected_client_id]
 912        connection_info = expected_client_info._Client__connection
 913        if connection_info is None:
 914            raise Exception('Expected client was not connected yet!')
 915        connection_info.this_is_raw_connection = is_raw
 916
 917    def set_inline_processor_for_client(self, expected_client_id,
 918                                        class_for_unknown_clients_inline_processing: type):
 919        expected_client_info = self._expected_clients[expected_client_id]
 920        connection_info = expected_client_info._Client__connection
 921        if connection_info is None:
 922            raise Exception('Expected client was not connected yet!')
 923        self._set_inline_processor_for_client(connection_info, expected_client_info,
 924                                              class_for_unknown_clients_inline_processing)
 925
 926    def close_client_connection(self, expected_client_id, raise_if_already_closed=True):
 927        """
 928        Connection will be closed immediately (inside this method)
 929        :param expected_client_id:
 930        :param raise_if_already_closed:
 931        :return:
 932        """
 933        if __debug__: self._log('CLOSE EXPECTED CLIENT SOCKET:')
 934        expected_client_info = self._expected_clients[expected_client_id]
 935        connection_info = expected_client_info._Client__connection
 936        if raise_if_already_closed and (connection_info is None):
 937            raise Exception('Expected client was not connected yet!')
 938        self.close_connection(connection_info.id)
 939
 940    def mark_client_connection_as_should_be_closed_immediately(self, expected_client_id,
 941                                                               raise_if_already_closed=True):
 942        """
 943        Connection will be closed immediately (inside main IO loop)
 944        :param expected_client_id:
 945        :param raise_if_already_closed:
 946        :return:
 947        """
 948        if __debug__: self._log('MARK EXPECTED CLIENT SOCKET AS SHOULD BE CLOSED IMMEDIATELY:')
 949        expected_client_info = self._expected_clients[expected_client_id]
 950        connection_info = expected_client_info._Client__connection
 951        if raise_if_already_closed and (connection_info is None):
 952            raise Exception('Expected client was not connected yet!')
 953        self._mark_connection_to_be_closed_immediately(connection_info)
 954
 955    def mark_client_connection_as_ready_to_be_closed(self, expected_client_id, raise_if_already_closed=True):
 956        """
 957        Connection will be closed when all output will be sent (inside main IO loop).
 958        :param expected_client_id:
 959        :param raise_if_already_closed:
 960        :return:
 961        """
 962        if __debug__: self._log('MARK EXPECTED CLIENT SOCKET AS READY TO BE CLOSED:')
 963        expected_client_info = self._expected_clients[expected_client_id]
 964        connection_info = expected_client_info._Client__connection
 965        if raise_if_already_closed and (connection_info is None):
 966            raise Exception('Expected client was not connected yet!')
 967        self._mark_connection_as_ready_to_be_closed(connection_info)
 968
 969    def remove_client(self, expected_client_id):
 970        if __debug__: self._log('REMOVE EXPECTED CLIENT: {}'.format(expected_client_id))
 971        expected_client_info = self._expected_clients[expected_client_id]
 972        if __debug__: self._log('\tWITH KEYWORD: {}'.format(expected_client_info.connection_settings.keyword))
 973        connection_id = expected_client_info.connection_id
 974        if connection_id is None:
 975            self._remove_client(expected_client_id)
 976        self.remove_connection(connection_id)
 977
 978    def add_connection(self, conn, address):
 979        """
 980        :param conn: socket
 981        :param address: address
 982        :return: client ID
 983        """
 984        if conn is None:
 985            raise TypeError('conn should not be None!')
 986
 987        self.socket_by_fd[conn.fileno()] = conn
 988        conn.setblocking(0)
 989        if self.use_nodelay_inet and (conn.family in INET_TYPE_CONNECTIONS):
 990            conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 991
 992        new_client_id = self._connections_id_gen()
 993
 994        client_info = Connection(new_client_id, (conn, address), self)
 995        self._connections[new_client_id] = client_info
 996        self._connection_by_conn[conn] = new_client_id
 997        self._input_check_sockets.add(conn)
 998        self._exception_check_sockets.add(conn)
 999        self._we_have_connections_for_select = True
1000
1001        self._unconfirmed_clients.add(conn)
1002
1003        return new_client_id
1004
1005    def check_connection_existance(self, connection_id):
1006        if connection_id not in self._expected_clients:
1007            return False
1008        if connection_id not in self._connections:
1009            return False
1010        client_info = self._connections[connection_id]
1011        if not client_info.conn.existence:
1012            return False
1013        conn = client_info.conn.result
1014        if conn is None:
1015            return False
1016        return True
1017
1018    def close_connection(self, connection_id):
1019        if __debug__: self._log('CLOSE CLIENT {}:'.format(connection_id))
1020        client_info = self._connections[connection_id]
1021        if not client_info.conn.existence:
1022            if __debug__: self._log('CLIENT {} CONN IS NOT SET.'.format(connection_id))
1023            return
1024        conn = client_info.conn.result
1025        if conn is None:
1026            if __debug__: self._log('CLIENT {} CONN IS NONE.'.format(connection_id))
1027            return
1028
1029        del self.socket_by_fd[conn.fileno()]
1030        conn.close()
1031        client_info.conn.existence = False
1032        client_info.output_to_client = copy.copy(client_info.output_to_client)  # clear all output data to free some
1033        #   memory even before destroying
1034
1035        if connection_id in self._connections_marked_as_ready_to_be_closed:
1036            self._connections_marked_as_ready_to_be_closed.remove(connection_id)
1037        if conn in self._connections_marked_to_be_closed_immediately:
1038            self._connections_marked_to_be_closed_immediately.remove(conn)
1039        if conn in self._connection_by_conn:
1040            del self._connection_by_conn[conn]
1041        if conn in self._input_check_sockets:
1042            self._input_check_sockets.remove(conn)
1043        if conn in self._output_check_sockets:
1044            self._output_check_sockets.remove(conn)
1045        if conn in self._exception_check_sockets:
1046            self._exception_check_sockets.remove(conn)
1047
1048        if not self._input_check_sockets:
1049            self._we_have_connections_for_select = False
1050
1051        if __debug__: self._log('CLIENT {} NORMALLY CLOSED.'.format(connection_id))
1052
1053    def close_connection_by_conn(self, conn):
1054        # Если conn не в списке - вылетет ошибка. Это предотвратит ошибочное закрытие незарегистрированного сокета.
1055        # И мы сможем обнаружить наличие соответствующей ошибки в коде.
1056        if __debug__: self._log('CLOSE CLIENT BY CONN: {}'.format(repr(conn)))
1057        connection_id = self._connection_by_conn[conn]
1058        if __debug__: self._log('\t WITH CLIENT ID: {}'.format(connection_id))
1059        self.close_connection(connection_id)
1060
1061    def remove_connection(self, connection_id):
1062        # client should NOT be removed immediately after connection close (close_connection):
1063        # code should do it by itself after reading all available input data
1064        if __debug__: self._log('REMOVE CLIENT: {}'.format(connection_id))
1065        client_info = self._connections[connection_id]
1066        if __debug__: self._log('\tWITH KEYWORD: {}'.format(client_info.keyword))
1067        if client_info.conn.existence:
1068            self.close_connection(connection_id)
1069        conn = client_info.conn.result
1070        if conn is None:
1071            return
1072
1073        if conn in self._conns_of_unexpected_clients:
1074            self._remove_unexpected_client(self._conns_of_unexpected_clients[conn])
1075
1076        # if conn in self._conns_of_expected_clients:
1077        #     self._remove_client(self._conns_of_expected_clients[conn])
1078        if client_info.connected_expected_client_id is not None:
1079            self._remove_client(client_info.connected_expected_client_id)
1080
1081        client_info.connected_expected_client_id = None
1082        client_info.connected_expected_client = None
1083
1084        if connection_id in self._connections_marked_as_ready_to_be_deleted:
1085            self._connections_marked_as_ready_to_be_deleted.remove(connection_id)
1086
1087        client_info.conn.existence = False
1088        client_info.conn.result = None
1089
1090        del self._connections[connection_id]
1091        if conn in self._connection_by_conn:
1092            del self._connection_by_conn[conn]
1093        if conn in self._input_check_sockets:
1094            self._input_check_sockets.remove(conn)
1095        if conn in self._output_check_sockets:
1096            self._output_check_sockets.remove(conn)
1097        if conn in self._exception_check_sockets:
1098            self._exception_check_sockets.remove(conn)
1099
1100            # client_info.remove()
1101
1102    def remove_connection_by_conn(self, conn):
1103        connection_id = self._connection_by_conn[conn]
1104        self.remove_connection(connection_id)
1105
1106    def _log(self, log_string):
1107        self._internal_log.append(log_string)
1108        if self.echo_log:
1109            print(log_string)
1110            sys.stdout.flush()
1111
1112    def _create_unknown_client_from_connection(self, client_info: Connection):
1113        keyword = None
1114        keyword_is_ok = False
1115        while not keyword_is_ok:
1116            keyword = self.prefix_for_unknown_client_keywords + self._unknown_clients_keyword_gen().encode()
1117            if keyword not in self._keywords_for_expected_clients:
1118                keyword_is_ok = True
1119        connection_settings = ConnectionSettings(direction_role=ConnectionDirectionRole.client,
1120                                                 socket_address=client_info.addr.result, keyword=keyword)
1121        expected_client_info = Client(connection_settings)
1122        expected_client_info.id = self._expected_clients_id_gen()
1123
1124        expected_client_info.connection_id = client_info.id
1125        expected_client_info._Client__connection = client_info
1126        expected_client_info.will_use_raw_client_connection = True
1127        expected_client_info.will_use_raw_connection_without_handshake = True
1128        expected_client_info.this_is_unknown_client = True
1129        if self.class_for_unknown_clients_inline_processing is not None:
1130            self._set_inline_processor_for_client(client_info, expected_client_info,
1131                                                  self.class_for_unknown_clients_inline_processing)
1132
1133        self._conns_of_expected_clients[client_info.conn.result] = expected_client_info.id
1134        client_info.connected_expected_client_id = expected_client_info.id
1135        client_info.connected_expected_client = expected_client_info
1136        self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword] = expected_client_info.id
1137        self._expected_clients[expected_client_info.id] = expected_client_info
1138
1139        self._io_iteration_result.newly_connected_unknown_clients.add(expected_client_info.id)
1140
1141        return expected_client_info.id
1142
1143    def _set_inline_processor_for_client(self, connection_info: Connection,
1144                                         expected_client_info: Client,
1145                                         class_for_unknown_clients_inline_processing: type):
1146        assert type(class_for_unknown_clients_inline_processing) == type, \
1147            '.class_for_unknown_clients_inline_processing must be a class or None'
1148        keyword = expected_client_info.connection_settings.keyword
1149        expected_client_info.obj_for_inline_processing = class_for_unknown_clients_inline_processing(
1150            expected_client_info.id, keyword, connection_info.conn.result.family, connection_info.conn.result.type,
1151            connection_info.conn.result.proto, copy.copy(connection_info.addr_info),
1152            copy.copy(connection_info.host_names), self._clients_with_inline_processors_that_need_to_apply_parameters
1153        )
1154        connection_info.has_inline_processor = True
1155        self._inline_processor__init_parameters(connection_info, expected_client_info)
1156
1157    def _connect_to_super_server(self, expected_client_info: Client):
1158        """
1159        Подключение происходит в блокируещем режиме (неблокирующий режим включается позже - в методе add_connection()).
1160        Для реализации неблокирующего режима надо оттестировать текущий код и ввести дополнительную неблокирующую
1161        логику через select/poll/epoll:
1162        http://man7.org/linux/man-pages/man2/connect.2.html
1163        :param expected_client_info:
1164        :return:
1165        """
1166        connection_settings = expected_client_info.connection_settings
1167        conn = None
1168        try:
1169            conn = socket.socket(connection_settings.socket_family, connection_settings.socket_type,
1170                                 connection_settings.socket_protocol, connection_settings.socket_fileno)
1171            self.socket_by_fd[conn.fileno()] = conn
1172            # conn.setblocking(0)
1173        except (socket.error, OSError) as err:
1174            if __debug__: self._log('EXCEPTION: SUPER SERVER: CONNECT TO: CREATE SOCKET: {}, {}'.format(
1175                err.errno, err.strerror))
1176            raise
1177
1178        try:
1179            conn.connect(connection_settings.socket_address)
1180        except (TimeoutError, socket.error, OSError) as err:
1181            # https://stackoverflow.com/questions/16772519/socket-recv-on-selected-socket-failing-with-etimedout
1182            del self.socket_by_fd[conn.fileno()]
1183            conn.close()
1184            if __debug__: self._log('EXCEPTION: SUPER SERVER: CONNECT TO: CONNECT:"{}", {}, {}'.format(
1185                connection_settings.socket_address, err.errno, err.strerror))
1186            raise
1187
1188        super_server_client_id = self.add_connection(conn, connection_settings.socket_address)
1189
1190        addr_info = host_names = None
1191        try:
1192            if self.should_get_client_addr_info_on_connection and (conn.family in INET_TYPE_CONNECTIONS):
1193                addr_info = socket.getaddrinfo(connection_settings.socket_address[0],
1194                                               connection_settings.socket_address[1])
1195                host_names = socket.gethostbyaddr(connection_settings.socket_address[0])
1196        except ConnectionError as err:
1197            # An established connection was aborted by the software in your host machine
1198            if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(connection_settings.socket_address))
1199            if __debug__: self._log('EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format(
1200                connection_settings.socket_address, err.errno, err.strerror))
1201            self._mark_connection_to_be_closed_immediately(super_server_client_id)
1202            ok = False
1203        except (socket.error, OSError) as err:
1204            if __debug__: self._log('EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format(
1205                connection_settings.socket_address, err.errno, err.strerror))
1206            if err.errno in SET_OF_CONNECTION_ERRORS:
1207                # An established connection was aborted by the software in your host machine
1208                if __debug__: self._log(
1209                    'CLOSING {}: Connection reset by peer'.format(connection_settings.socket_address))
1210                if __debug__: self._log(
1211                    'EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format(
1212                        connection_settings.socket_address, err.errno, err.strerror))
1213                self._mark_connection_to_be_closed_immediately(super_server_client_id)
1214                ok = False
1215            else:
1216                if 'nt' == os.name:
1217                    if errno.WSAECONNRESET == err.errno:
1218                        # An existing connection was forcibly closed by the remote host
1219                        if __debug__: self._log(
1220                            'CLOSING {}: Connection reset by peer'.format(connection_settings.socket_address))
1221                        if __debug__: self._log(
1222                            'EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format(
1223                                connection_settings.socket_address, err.errno, err.strerror))
1224                        self._mark_connection_to_be_closed_immediately(super_server_client_id)
1225                        ok = False
1226                    else:
1227                        raise err
1228                else:
1229                    raise err
1230
1231        super_server_client_info = self._connections[super_server_client_id]
1232        super_server_client_info.addr_info = addr_info
1233        super_server_client_info.host_names = host_names
1234        self._log_new_connection(super_server_client_info, False)
1235
1236        if expected_client_info.will_use_raw_connection_without_handshake:
1237            # Connection is made without handshake
1238            super_server_client_info.this_is_raw_connection = True
1239            self._unconfirmed_clients.remove(super_server_client_info.conn.result)
1240            self._io_iteration_result.newly_connected_expected_clients.add(expected_client_info.id)
1241        else:
1242            keyword = expected_client_info.connection_settings.keyword
1243            if keyword is None:
1244                keyword = self._get_own_keyword_appropriate_for_connection(conn)
1245            if keyword is None:
1246                # Если ключевое слово не предоставлено - клиент будет помечен для закрытия и удаления
1247                self._mark_connection_to_be_closed_immediately(super_server_client_info)
1248                self._mark_connection_as_ready_for_deletion(super_server_client_info)
1249                raise Exception('Own keyword should be provided in connection_settings!')
1250            if keyword:
1251                # Если ключевое слово предоставлено (даже если оно путое типа b'' - оно будет отправлено супер-серверу).
1252                self._send_message_through_connection(super_server_client_info, keyword)
1253
1254        expected_client_info.connection_id = super_server_client_id
1255        expected_client_info._Client__connection = super_server_client_info
1256        self._conns_of_expected_clients[conn] = expected_client_info.id
1257        super_server_client_info.connected_expected_client_id = expected_client_info.id
1258        super_server_client_info.connected_expected_client = expected_client_info
1259
1260    def _get_own_keyword_appropriate_for_connection(self, conn):
1261        keyword = None
1262        random_keyword = None
1263        for gate_connection_settings in self.gates_connections_settings:
1264            random_keyword = gate_connection_settings.keyword
1265            if (conn.family == gate_connection_settings.socket_family) and (
1266                conn.type == gate_connection_settings.socket_type) and \
1267                    (conn.proto == gate_connection_settings.socket_protocol):
1268                keyword = gate_connection_settings.keyword
1269        if keyword is None:
1270            keyword = random_keyword
1271        return keyword
1272
1273    def _pack_message(self, data):
1274        return len(data).to_bytes(self.message_size_len, 'little') + data
1275
1276    def _remove_client(self, expected_client_id):
1277        expected_client_info = self._expected_clients[expected_client_id]
1278        del self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword]
1279        del self._expected_clients[expected_client_id]
1280        connection_info = expected_client_info._Client__connection
1281        if connection_info is not None:
1282            # you can remove expected client before it will be connected to the server
1283            del self._conns_of_expected_clients[connection_info.conn.result]
1284        expected_client_info.connection_id = None
1285        expected_client_info._Client__connection = None
1286
1287    def _add_unexpected_client(self, connection_id, keyword):
1288        connection_settings = ConnectionSettings(direction_role=ConnectionDirectionRole.client, keyword=keyword)
1289        unexpected_client_info = Client(connection_settings, self._unexpected_clients_id_gen(),
1290                                        connection_id)
1291
1292        self._unexpected_clients[unexpected_client_info.id] = unexpected_client_info
1293        self._keywords_of_unexpected_clients[keyword] = unexpected_client_info.id
1294        connection_info = unexpected_client_info._Client__connection
1295        self._conns_of_unexpected_clients[connection_info.conn.result] = unexpected_client_info.id
1296
1297        return unexpected_client_info.id
1298
1299    def _remove_unexpected_client(self, unexpected_client_id):
1300        unexpected_client_info = self._unexpected_clients[unexpected_client_id]
1301        connection_info = unexpected_client_info._Client__connection  # unexpected client appears only after
1302        #   connection
1303        del self._keywords_of_unexpected_clients[unexpected_client_info.connection_settings.keyword]
1304        del self._unexpected_clients[unexpected_client_id]
1305        del self._conns_of_unexpected_clients[connection_info.conn.result]
1306        unexpected_client_info.connection_id = None
1307        unexpected_client_info._Client__connection = None
1308
1309    # @profile
1310    def _accept_new_connection(self, readable_socket: socket.socket):
1311        # One of a "readable" self._gate sockets is ready to accept a connection
1312        ok = True
1313        while ok:
1314            connection_id = None
1315            client_info = None
1316
1317            connection = None
1318            client_address = None
1319            try:
1320                connection, client_address = readable_socket.accept()
1321                self.socket_by_fd[connection.fileno()] = connection
1322                connection_id = self.add_connection(connection, client_address)
1323                client_info = self._connections[connection_id]
1324            except BlockingIOError as err:
1325                ok = False
1326            except InterruptedError as err:
1327                pass
1328            except (socket.error, OSError) as err:
1329                if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno):
1330                    ok = False
1331                elif errno.EINTR == err.errno:
1332                    pass
1333                elif errno.ECONNABORTED == err.errno:
1334                    ok = False
1335                elif errno.EMFILE == err.errno:
1336                    if __debug__: self._log(
1337                        'The per-process limit on the number of open file descriptors had been reached.')
1338                    ok = False
1339                elif errno.ENFILE == err.errno:
1340                    if __debug__: self._log(
1341                        'The system-wide limit on the total number of open files had been reached.')
1342                    ok = False
1343                elif (errno.ENOBUFS == err.errno) or (errno.ENOMEM == err.errno):
1344                    if __debug__: self._log(
1345                        'Not enough free memory. Allocation is limited by the socket buffer limits.')
1346                    ok = False
1347                elif errno.EPROTO == err.errno:
1348                    if __debug__: self._log('Protocol error.')
1349                    ok = False
1350                elif errno.EPERM == err.errno:
1351                    if __debug__: self._log('Firewall rules forbid connection.')
1352                    ok = False
1353                else:
1354                    raise err
1355
1356            if (connection is not None) and (client_info is not None):
1357                addr_info = host_names = None
1358                try:
1359                    if self.should_get_client_addr_info_on_connection and \
1360                            (connection.family in INET_TYPE_CONNECTIONS):
1361                        addr_info = socket.getaddrinfo(client_address[0], client_address[1])
1362                        host_names = socket.gethostbyaddr(client_address[0])
1363                except ConnectionError as err:
1364                    # An established connection was aborted by the software in your host machine
1365                    if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(client_address))
1366                    if __debug__: self._log('EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format(
1367                        client_address, err.errno, err.strerror))
1368                    self._mark_connection_to_be_closed_immediately(client_info)
1369                    ok = False
1370                except (socket.error, OSError) as err:
1371                    if __debug__: self._log('EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format(
1372                        client_address, err.errno, err.strerror))
1373                    if err.errno in SET_OF_CONNECTION_ERRORS:
1374                        # An established connection was aborted by the software in your host machine
1375                        if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(client_address))
1376                        if __debug__: self._log('EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format(
1377                            client_address, err.errno, err.strerror))
1378                        self._mark_connection_to_be_closed_immediately(client_info)
1379                        ok = False
1380                    else:
1381                        if 'nt' == os.name:
1382                            if errno.WSAECONNRESET == err.errno:
1383                                # An existing connection was forcibly closed by the remote host
1384                                if __debug__: self._log(
1385                                    'CLOSING {}: Connection reset by peer'.format(client_address))
1386                                if __debug__: self._log(
1387                                    'EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format(
1388                                        client_address, err.errno, err.strerror))
1389                                self._mark_connection_to_be_closed_immediately(client_info)
1390                                ok = False
1391                            else:
1392                                raise err
1393                        else:
1394                            raise err
1395
1396                client_info.addr_info = addr_info
1397                client_info.host_names = host_names
1398                self._log_new_connection(client_info, True)
1399
1400                if self.need_to_auto_check_incoming_raw_connection \
1401                        and self.raw_checker_for_new_incoming_connections(self, client_info):
1402                    # Is should be RAW:
1403                    if self.unknown_clients_are_allowed:
1404                        # Unknown clients are allowed - create Unknown Expected Client for this connection
1405                        client_info.this_is_raw_connection = True
1406                        self._unconfirmed_clients.remove(client_info.conn.result)
1407                        self._create_unknown_client_from_connection(client_info)
1408                    else:
1409                        # Unknown clients are Non allowed - close connection.
1410                        if __debug__: self._log(
1411                            'UNKNOWN CLIENT {} WILL BE CLOSED: UNKNOWN CLIENTS ARE NOT ALLOWED'.format(
1412                                client_info.addr.result
1413                            ))
1414                        self._mark_connection_to_be_closed_immediately(client_info)
1415
1416    # @profile
1417    def _read_data_from_already_connected_socket__inner__memory_optimized(self, curr_client_info: Connection,
1418                                                                          possible_messages_in_client_input_fifo=False):
1419        ok = True
1420
1421        data = curr_client_info.conn.result.recv(curr_client_info.recv_buff_size)
1422        data_len = len(data)
1423        curr_client_info.calc_new_recv_buff_size(data_len)
1424        if data:
1425            data = memoryview(data)
1426            if curr_client_info.this_is_raw_connection:
1427                curr_client_info.input_from_client.put(data)
1428                possible_messages_in_client_input_fifo = True
1429            else:
1430                curr_client_info.raw_input_from_client.add_piece_of_data(data)
1431                possible_messages_in_client_input_fifo = self._read_messages_from_raw_input_into_fifo(
1432                    curr_client_info)
1433        else:
1434            # Interpret empty result as closed connection
1435            if __debug__: self._log(
1436                'CLOSING {} after reading no data:'.format(curr_client_info.addr.result))
1437            self._mark_connection_to_be_closed_immediately(curr_client_info)
1438            ok = False
1439
1440        result = (ok, possible_messages_in_client_input_fifo)
1441        return result
1442
1443    # @profile
1444    def _read_data_from_already_connected_socket__inner__speed_optimized(self, curr_client_info: Connection,
1445                                                                         possible_messages_in_client_input_fifo=False):
1446        ok = True
1447
1448        if curr_client_info.current_memoryview_input:
1449            nbytes = 0
1450            try:
1451                nbytes = curr_client_info.conn.result.recv_into(curr_client_info.current_memoryview_input)
1452            except TimeoutError:
1453                # https://stackoverflow.com/questions/16772519/socket-recv-on-selected-socket-failing-with-etimedout
1454                pass
1455
1456            if nbytes > 0:
1457                data = curr_client_info.current_memoryview_input[:nbytes]
1458                curr_client_info.current_memoryview_input = curr_client_info.current_memoryview_input[nbytes:]
1459
1460                if curr_client_info.this_is_raw_connection:
1461                    curr_client_info.input_from_client.put(data)
1462                    possible_messages_in_client_input_fifo = True
1463                else:
1464                    curr_client_info.raw_input_from_client.add_piece_of_data(data)
1465                    possible_messages_in_client_input_fifo = self._read_messages_from_raw_input_into_fifo(
1466                        curr_client_info)
1467            else:
1468                # Interpret empty result as closed connection
1469                if __debug__: self._log(
1470                    'CLOSING {} after reading no data:'.format(curr_client_info.addr.result))
1471                self._mark_connection_to_be_closed_immediately(curr_client_info)
1472                ok = False
1473        else:
1474            input_buffer = bytearray(self.socket_read_fixed_buffer_size.result)
1475            curr_client_info.current_memoryview_input = memoryview(input_buffer)
1476
1477        result = (ok, possible_messages_in_client_input_fifo)
1478        return result
1479
1480    # @profile
1481    def _read_data_from_already_connected_socket__shell(self, readable_socket: socket.socket):
1482        possible_messages_in_client_input_fifo = False
1483
1484        curr_client_id = self._connection_by_conn[readable_socket]
1485        curr_client_info = self._connections[curr_client_id]
1486
1487        ok = True
1488        while ok:
1489            try:
1490                if self.use_speed_optimized_socket_read:
1491                    ok, possible_messages_in_client_input_fifo = \
1492                        self._read_data_from_already_connected_socket__inner__speed_optimized(
1493                            curr_client_info, possible_messages_in_client_input_fifo)
1494                else:
1495                    ok, possible_messages_in_client_input_fifo = \
1496                        self._read_data_from_already_connected_socket__inner__memory_optimized(
1497                            curr_client_info, possible_messages_in_client_input_fifo)
1498                break  # makes IO faster on 10-30% on all modes (message/raw, data/http,
1499                #   static_read_buffer/non_static_read_buffer).
1500                # Ускорение происходит за счет того, что: 1) данных оказывается не настолько много чтобы кеш процессора
1501                # переполнялся и требовалась бы работа с оперативной памятью; 2) при таком разбиении ввод и вывод
1502                # начинают происходить (на уровне ОС) одновременно. Т.е. мы взяли из входного буфера порцию данных и
1503                # пустили в обработку. В это время ввод на уровне ОС продолжается: ОС заполняет опустевшие буферы.
1504                # После обработки мы пускаем данные на отправку и забираем очередную порцию данных из входного буфера.
1505                # В это время происходит отправка данных из буферов на уровне ОС. И т.д.
1506            except BlockingIOError as err:
1507                ok = False
1508            except InterruptedError as err:
1509                pass
1510            except ConnectionError as err:
1511                # An established connection was aborted by the software in your host machine
1512                if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result))
1513                if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format(
1514                    curr_client_info.addr.result, err.errno, err.strerror))
1515                self._mark_connection_to_be_closed_immediately(curr_client_info)
1516                ok = False
1517            except (socket.error, OSError) as err:
1518                if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno):
1519                    ok = False
1520                elif errno.EINTR == err.errno:
1521                    pass
1522                elif err.errno in SET_OF_CONNECTION_ERRORS:
1523                    # An established connection was aborted by the software in your host machine
1524                    if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result))
1525                    if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format(
1526                        curr_client_info.addr.result, err.errno, err.strerror))
1527                    self._mark_connection_to_be_closed_immediately(curr_client_info)
1528                    ok = False
1529                else:
1530                    if 'nt' == os.name:
1531                        if errno.WSAECONNRESET == err.errno:
1532                            # An existing connection was forcibly closed by the remote host
1533                            if __debug__: self._log(
1534                                'CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result))
1535                            if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format(
1536                                curr_client_info.addr.result, err.errno, err.strerror))
1537                            self._mark_connection_to_be_closed_immediately(curr_client_info)
1538                            ok = False
1539                        elif errno.EHOSTUNREACH == err.errno:
1540                            # OSError: [Errno 113] No route to host
1541                            if __debug__: self._log(
1542                                'CLOSING {}: No route to host'.format(curr_client_info.addr.result))
1543                            if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format(
1544                                curr_client_info.addr.result, err.errno, err.strerror))
1545                            self._mark_connection_to_be_closed_immediately(curr_client_info)
1546                            ok = False
1547                        else:
1548                            if __debug__: self._log(
1549                                'CLOSING {}: Unknown reason'.format(curr_client_info.addr.result))
1550                            if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format(
1551                                curr_client_info.addr.result, err.errno, err.strerror))
1552                            raise err
1553                    else:
1554                        if errno.WSAEHOSTUNREACH == err.errno:
1555                            # OSError: [Errno 113] No route to host
1556                            if __debug__: self._log(
1557                                'CLOSING {}: No route to host'.format(curr_client_info.addr.result))
1558                            if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format(
1559                                curr_client_info.addr.result, err.errno, err.strerror))
1560                            self._mark_connection_to_be_closed_immediately(curr_client_info)
1561                            ok = False
1562                        else:
1563                            if __debug__: self._log(
1564                                'CLOSING {}: Unknown reason'.format(curr_client_info.addr.result))
1565                            if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format(
1566                                curr_client_info.addr.result, err.errno, err.strerror))
1567                            self._mark_connection_to_be_closed_immediately(curr_client_info)
1568                            ok = False
1569                            # raise err
1570
1571            read_is_forbidden = False
1572            if (self.global_in__data_full_size.result - self.global_in__deletable_data_full_size.result) \
1573                    > self.global_in__data_size_limit.result:
1574                read_is_forbidden = True
1575                ok = False
1576
1577            read_is_forbidden_test = self.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger(
1578                read_is_forbidden)
1579            if read_is_forbidden_test is not None:
1580                if read_is_forbidden_test:
1581                    print('Read is suppressed until data will be processed.')
1582                else:
1583                    print('Read is allowed: data is processed.')
1584
1585        return possible_messages_in_client_input_fifo
1586
1587    # @profile
1588    def _read_data_from_socket(self, readable_socket: socket.socket):
1589        possible_messages_in_client_input_fifo = False
1590        if readable_socket in self._gate:
1591            accept_is_forbidden = True
1592            if (self.global_in__data_full_size.result + self.global_out__data_full_size.result) \
1593                    <= self.global__data_size_limit.result:
1594                accept_is_forbidden = False
1595                self._accept_new_connection(readable_socket)
1596
1597            if __debug__:
1598                accept_is_forbidden_test = \
1599                    self.show_inform_about_accept_stop_because_of_all_buffers_size_limit.test_trigger(
1600                        accept_is_forbidden)
1601                if accept_is_forbidden_test is not None:
1602                    if accept_is_forbidden_test:
1603                        print('Accept is suppressed until data will be processed and/or out.')
1604                    else:
1605                        print('Accept is allowed: data is processed and/or out.')
1606        else:
1607            possible_messages_in_client_input_fifo = self._read_data_from_already_connected_socket__shell(
1608                readable_socket)
1609
1610        return possible_messages_in_client_input_fifo
1611
1612    def _log_new_connection(self, client_info: Connection, is_incoming_connection):
1613        connection = client_info.conn.result
1614        client_address = client_info.addr.result
1615        addr_info = client_info.addr_info
1616        host_names = client_info.host_names
1617
1618        connection_type_string = 'OUTGOING'
1619        from_to = 'to'
1620        if is_incoming_connection:
1621            connection_type_string = 'INCOMING'
1622            from_to = 'from'
1623
1624        if __debug__: self._log('New {} connection {} {}'.format(connection_type_string, from_to, client_address))
1625        if connection.family in {socket.AF_INET, socket.AF_INET6}:
1626            if __debug__: self._log('\taddr_info: {}'.format(addr_info))
1627            if __debug__: self._log('\thost_names: {}'.format(host_names))
1628
1629    # @profile
1630    def _write_data_to_socket(self, curr_client_info: Connection):
1631        # CAUTION: code here is optimized for speed - not for readability or beauty.
1632
1633        expected_client_info = curr_client_info.connected_expected_client
1634        writable_socket = curr_client_info.conn.result
1635        if curr_client_info.should_be_closed:
1636            curr_client_info.current_memoryview_output = None
1637            curr_client_info.output_to_client = copy.copy(curr_client_info.output_to_client)
1638            return
1639
1640        ok = True
1641        first_pass = True
1642        can_call__inline_processor__on__output_buffers_are_empty = True
1643        while ok:
1644            try:
1645                if curr_client_info.current_memoryview_output:
1646                    nsent = writable_socket.send(curr_client_info.current_memoryview_output)
1647                    curr_client_info.current_memoryview_output = curr_client_info.current_memoryview_output[nsent:]
1648                else:
1649                    curr_client_info.current_memoryview_output = None
1650                    output_fifo_size = curr_client_info.output_to_client.size()
1651                    if output_fifo_size > 1:
1652                        result_data, result_size, result_qnt = \
1653                            curr_client_info.output_to_client.get_at_least_size(524288)
1654                        if result_qnt > 1:
1655                            curr_client_info.current_memoryview_output = memoryview(b''.join(result_data))
1656                        else:
1657                            curr_client_info.current_memoryview_output = memoryview(result_data.popleft())
1658                    elif output_fifo_size == 1:
1659                        curr_client_info.current_memoryview_output = memoryview(curr_client_info.output_to_client.get())
1660
1661                    if curr_client_info.current_memoryview_output is None:
1662                        # if curr_client_info.ready_to_be_closed:
1663                        #     if first_pass:
1664                        #         # Т.е. если данных на отправку небыло даже при первом проходе цикла - т.е. изначально.
1665                        #         # Это значит что все данные были отправлены, и можно закрывать соединение.
1666                        #         self._output_check_sockets.remove(writable_socket)
1667                        #         self._mark_connection_to_be_closed_immediately(curr_client_info)
1668                        #     # Если соединение помечено как "Готово к закрытию" - то нам надо дождаться момента когда
1669                        #     # данные будут отправлены, и только в этот момент закрывать соединение. Поэтому надо
1670                        #     # сохранить сокет в списке проверяемых для отправки.
1671                        # else:
1672                        #     self._output_check_sockets.remove(writable_socket)
1673                        if not curr_client_info.ready_to_be_closed:
1674                            # Если соединение помечено как "Готово к закрытию" - то нам надо дождаться момента когда
1675                            # данные будут отправлены, и только в этот момент закрывать соединение. Поэтому надо
1676                            # сохранить сокет в списке проверяемых для отправки.
1677                            self._output_check_sockets.remove(writable_socket)
1678                        if first_pass and curr_client_info.ready_to_be_closed:
1679                            # Т.е. если данных на отправку небыло даже при первом проходе цикла - т.е. изначально.
1680                            # Это значит что все данные были отправлены, и можно закрывать соединение.
1681                            self._output_check_sockets.remove(writable_socket)
1682                            self._mark_connection_to_be_closed_immediately(curr_client_info)
1683                        ok = False
1684                        if expected_client_info:
1685                            self._io_iteration_result.clients_with_empty_output_fifo.add(
1686                                curr_client_info.connected_expected_client_id)
1687                            if curr_client_info.has_inline_processor:
1688                                if can_call__inline_processor__on__output_buffers_are_empty:
1689                                    if self._inline_processor__on__output_buffers_are_empty(curr_client_info,
1690                                                                                            expected_client_info):
1691                                        ok = True
1692                                    can_call__inline_processor__on__output_buffers_are_empty = False
1693            except BlockingIOError as err:
1694                ok = False
1695            except InterruptedError as err:
1696                pass
1697            except ConnectionError as err:
1698                # An established connection was aborted by the software in your host machine
1699                if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result))
1700                if __debug__: self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format(
1701                    curr_client_info.addr.result, err.errno, err.strerror))
1702                self._mark_connection_to_be_closed_immediately(curr_client_info)
1703                ok = False
1704            except (socket.error, OSError) as err:
1705                if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno):
1706                    ok = False
1707                elif errno.EINTR == err.errno:
1708                    pass
1709                elif err.errno in SET_OF_CONNECTION_ERRORS:
1710                    # Connection reset by peer
1711                    if __debug__: self._log(
1712                        'CLOSING {}: Connection reset by peer ({})'.format(curr_client_info.addr.result,
1713                                                                           err.strerror))
1714                    if __debug__: self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format(
1715                        curr_client_info.addr.result, err.errno, err.strerror))
1716                    self._mark_connection_to_be_closed_immediately(curr_client_info)
1717                    ok = False
1718                else:
1719                    if 'nt' == os.name:
1720                        if errno.WSAECONNRESET == err.errno:
1721                            # An existing connection was forcibly closed by the remote host
1722                            if __debug__: self._log(
1723                                'CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result))
1724                            if __debug__: self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format(
1725                                curr_client_info.addr.result, err.errno, err.strerror))
1726                            self._mark_connection_to_be_closed_immediately(curr_client_info)
1727                            ok = False
1728                        else:
1729                            raise err
1730                    else:
1731                        raise err
1732            first_pass = False
1733
1734    def _mark_connection_to_be_closed_immediately(self, client_info: Connection):
1735        client_info.should_be_closed = True
1736        client_info.current_memoryview_input = None
1737        self._connections_marked_to_be_closed_immediately.add(client_info.conn.result)
1738        if client_info.connected_expected_client_id is not None:
1739            self._io_iteration_result.clients_with_disconnected_connection.add(
1740                client_info.connected_expected_client_id)
1741
1742    def _mark_connection_as_ready_to_be_closed(self, client_info: Connection):
1743        client_info.ready_to_be_closed = True
1744
1745    def _mark_connection_as_ready_for_deletion(self, client_info: Connection):
1746        client_info.ready_for_deletion = True
1747        self._connections_marked_as_ready_to_be_deleted.add(client_info.conn.result)
1748
1749    def _handle_connection_error(self, writable_socket: socket.socket):
1750        # Data read from already connected client
1751        curr_client_id = self._connection_by_conn[writable_socket]
1752        curr_client_info = self._connections[curr_client_id]
1753        if curr_client_info.should_be_closed:
1754            return
1755        if __debug__: self._log('handling exceptional condition for {}'.format(curr_client_info.addr.result))
1756        self._mark_connection_to_be_closed_immediately(curr_client_info)
1757
1758    # @profile
1759    def _read_messages_from_raw_input_into_fifo(self, curr_client_info: Connection):
1760        result = False
1761
1762        try:
1763            while True:
1764                if curr_client_info.current_message_length is None:
1765                    current_message_length = curr_client_info.raw_input_from_client.get_data(self.message_size_len)
1766                    if current_message_length is None:
1767                        break
1768
1769                    curr_client_info.current_message_length = int.from_bytes(current_message_length, 'little')
1770
1771                current_message = curr_client_info.raw_input_from_client.get_data(
1772                    curr_client_info.current_message_length)
1773                if current_message is None:
1774                    break
1775                else:
1776                    curr_client_info.input_from_client.put(current_message)
1777                    curr_client_info.current_message_length = None
1778                    result = True
1779        except TypeError:
1780            pass
1781
1782        return result
1783
1784    def _send_message_through_connection(self, client_info: Connection, data):
1785        if client_info.conn.existence:
1786            client_info.output_to_client.put(len(data).to_bytes(self.message_size_len, 'little'))
1787            client_info.output_to_client.put(data)
1788
1789            self._output_check_sockets.add(client_info.conn.result)
1790        else:
1791            if __debug__: self._log('ERROR: SEND MESSAGE TO CLIENT {}: "{}"'.format(client_info.addr.result, data))
1792            raise Exception('EXCEPTION: SEND MESSAGE TO CLIENT: Client is disconnected! You can not send data to him!')
1793
1794    def _generate_list_of_messages_with_their_length(self, messages_list):
1795        for message in messages_list:
1796            yield len(message).to_bytes(self.message_size_len, 'little')
1797            yield message
1798
1799    def _send_messages_through_connection(self, client_info: Connection, messages_list):
1800        if client_info.conn.existence:
1801            client_info.output_to_client.extend(self._generate_list_of_messages_with_their_length(messages_list))
1802
1803            self._output_check_sockets.add(client_info.conn.result)
1804        else:
1805            if __debug__: self._log(
1806                'ERROR: SEND MESSAGES TO CLIENT {}: "{}"'.format(client_info.addr.result, messages_list))
1807            raise Exception('EXCEPTION: SEND MESSAGES TO CLIENT: Client is disconnected! You can not send data to him!')
1808
1809    def _send_message_through_connection_raw(self, client_info: Connection, data):
1810        if client_info.conn.existence:
1811            client_info.output_to_client.put(data)
1812            self._output_check_sockets.add(client_info.conn.result)
1813        else:
1814            if __debug__: self._log('ERROR: SEND MESSAGE TO CLIENT {}: "{}"'.format(client_info.addr.result, data))
1815            raise Exception(
1816                'EXCEPTION: SEND MESSAGE TO CLIENT RAW: Client is disconnected! You can not send data to him!')
1817
1818    def _send_messages_through_connection_raw(self, client_info: Connection, messages_list):
1819        if client_info.conn.existence:
1820            client_info.output_to_client.extend(messages_list)
1821            self._output_check_sockets.add(client_info.conn.result)
1822        else:
1823            if __debug__: self._log(
1824                'ERROR: SEND MESSAGES TO CLIENT {}: "{}"'.format(client_info.addr.result, messages_list))
1825            raise Exception(
1826                'EXCEPTION: SEND MESSAGES TO CLIENT RAW: Client is disconnected! You can not send data to him!')
1827
1828    def _move_message_from_fifo_to_memoryview(self, client_info: Connection):
1829        if client_info.current_memoryview_output is None:
1830            if client_info.output_to_client.size():
1831                client_info.current_memoryview_output = memoryview(client_info.output_to_client.get())
1832
1833    # @profile
1834    def _consolidate_and_move_messages_from_fifo_to_memoryview(self, client_info: Connection):
1835        output_fifo_size = client_info.output_to_client.size()
1836        if output_fifo_size > 1:
1837            result_data, result_size, result_qnt = \
1838                client_info.output_to_client.get_at_least_size(524288)
1839            if result_qnt > 1:
1840                client_info.current_memoryview_output = memoryview(b''.join(result_data))
1841            else:
1842                client_info.current_memoryview_output = memoryview(result_data.popleft())
1843        elif output_fifo_size == 1:
1844            client_info.current_memoryview_output = memoryview(client_info.output_to_client.get())
1845
1846    def _process_client_keyword(self, client_socket: socket.socket):
1847        curr_client_id = self._connection_by_conn[client_socket]
1848        curr_client_info = self._connections[curr_client_id]
1849
1850        if curr_client_info.input_from_client.size() >= 0:
1851            expected_client_id = None
1852            expected_client_info = None
1853
1854            this_is_super_server_client = False
1855            if curr_client_info.connected_expected_client is not None:
1856                expected_client_info = curr_client_info.connected_expected_client
1857                expected_client_id = expected_client_info.id
1858                if ConnectionDirectionRole.server == expected_client_info.connection_settings.direction_role:
1859                    this_is_super_server_client = True
1860
1861            if this_is_super_server_client:
1862                # This is connection to Super-Server. So we expect an answer like b'OK'
1863                super_server_answer__keyword_accepted = curr_client_info.input_from_client.get()
1864                super_server_answer__keyword_accepted = bytes(super_server_answer__keyword_accepted)
1865                if super_server_answer__keyword_accepted == self.server_answer__keyword_accepted:
1866                    # Answer was acceptable
1867                    self._unconfirmed_clients.remove(client_socket)
1868                    self._io_iteration_result.newly_connected_expected_clients.add(expected_client_id)
1869                    if expected_client_info.will_use_raw_client_connection:
1870                        curr_client_info.this_is_raw_connection = True
1871                else:
1872                    # Answer was NOT acceptable
1873                    self._mark_connection_to_be_closed_immediately(curr_client_info)
1874                    self._mark_connection_as_ready_for_deletion(curr_client_info)
1875                    if __debug__: self._log('ERROR: SUPER SERVER ANSWER - KEYWORD WAS NOT ACCEPTED: {}'.format(
1876                        super_server_answer__keyword_accepted))
1877            else:
1878                # This is connection to client. So we expect a keyword
1879                keyword = curr_client_info.input_from_client.get()
1880                keyword = bytes(keyword)
1881                curr_client_info.keyword = keyword
1882                self._unconfirmed_clients.remove(client_socket)
1883                self._send_message_through_connection(curr_client_info, self.server_answer__keyword_accepted)
1884
1885                if keyword in self._keywords_for_expected_clients:
1886                    # empty expected client was already registered
1887                    expected_client_id = self._keywords_for_expected_clients[keyword]
1888                    expected_client_info = self._expected_clients[expected_client_id]
1889                    expected_client_info.connection_id = curr_client_id
1890                    expected_client_info._Client__connection = curr_client_info
1891                    self._conns_of_expected_clients[client_socket] = expected_client_id
1892                    curr_client_info.connected_expected_client_id = expected_client_id
1893                    curr_client_info.connected_expected_client = expected_client_info
1894                    self._io_iteration_result.newly_connected_expected_clients.add(expected_client_id)
1895                    if expected_client_info.will_use_raw_client_connection:
1896                        curr_client_info.this_is_raw_connection = True
1897                else:
1898                    # it is unknown expected client
1899                    if self.unexpected_clients_are_allowed:
1900                        self._add_unexpected_client(curr_client_id, keyword)
1901                    else:
1902                        self._mark_connection_to_be_closed_immediately(curr_client_info)
1903                        self._mark_connection_as_ready_for_deletion(curr_client_info)
1904
1905    def _check_is_client_have_data_to_read_in_fifo(self, readable_socket: socket.socket):
1906        client_info = self._connections[self._connection_by_conn[readable_socket]]
1907        if client_info.connected_expected_client_id is not None:
1908            if client_info.input_from_client.size():
1909                self._io_iteration_result.clients_have_data_to_read.add(
1910                    client_info.connected_expected_client_id)
1911                if client_info.has_inline_processor:
1912                    self._inline_processor__on__data_received(client_info)
1913
1914    def _client_have_data_to_read_in_fifo(self, readable_socket: socket.socket):
1915        if readable_socket in self._conns_of_expected_clients:
1916            expected_client_id = self._conns_of_expected_clients[readable_socket]
1917            expected_client = self._expected_clients[expected_client_id]
1918            self._io_iteration_result.clients_have_data_to_read.add(expected_client_id)
1919            client_info = expected_client._Client__connection
1920            if client_info.has_inline_processor:
1921                self._inline_processor__on__data_received(client_info)
1922
1923    def _inline_processor__apply_parameters(self, connection_info: Connection,
1924                                            expected_client: Client):
1925        inline_processor = expected_client.obj_for_inline_processing
1926
1927        inline_processor.is_in_raw_mode = inline_processor._InlineProcessor__set__is_in_raw_mode
1928        connection_info.this_is_raw_connection = inline_processor._InlineProcessor__set__is_in_raw_mode
1929
1930        if inline_processor._InlineProcessor__set__mark_socket_as_should_be_closed_immediately:
1931            inline_processor._InlineProcessor__set__mark_socket_as_should_be_closed_immediately = False
1932            self._mark_connection_to_be_closed_immediately(connection_info)
1933
1934        if inline_processor._InlineProcessor__set__mark_socket_as_ready_to_be_closed:
1935            inline_processor._InlineProcessor__set__mark_socket_as_ready_to_be_closed = False
1936            self._mark_connection_as_ready_to_be_closed(connection_info)
1937
1938    def _inline_processor__init_parameters(self, connection_info: Connection,
1939                                           expected_client: Client):
1940        inline_processor = expected_client.obj_for_inline_processing
1941
1942        inline_processor.is_in_raw_mode = connection_info.this_is_raw_connection
1943        inline_processor._InlineProcessor__set__is_in_raw_mode = connection_info.this_is_raw_connection
1944
1945    def _inline_processor__on__data_received(self, connection_info: Connection):
1946        expected_client = connection_info.connected_expected_client
1947        inline_processor = expected_client.obj_for_inline_processing
1948
1949        try:
1950            while connection_info.input_from_client.size():
1951                inline_processor.on__data_received(connection_info.input_from_client.get())
1952
1953            if inline_processor.output_messages:
1954                while inline_processor.output_messages:
1955                    another_message = inline_processor.output_messages.popleft()
1956                    if not connection_info.this_is_raw_connection:
1957                        connection_info.output_to_client.put(
1958                            len(another_message).to_bytes(self.message_size_len, 'little'))
1959                    connection_info.output_to_client.put(another_message)
1960                self._output_check_sockets.add(connection_info.conn.result)
1961                if connection_info.output_to_client.get_data_full_size() >= 65536:
1962                    self._write_data_to_socket(connection_info)
1963                return True
1964        except:
1965            self.remove_client(expected_client.id)
1966            exc = sys.exc_info()
1967            exception = exc
1968            error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0]))
1969            formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2])
1970            exception = exception[:2] + (formatted_traceback,)
1971            trace_str = ''.join(exception[2])
1972            result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str)
1973            if __debug__: self._log('EXCEPTION: INLINE PROCESSOR: ON DATA RECEIVED: {}'.format(result_string))
1974
1975        return False
1976
1977    def _inline_processor__on__output_buffers_are_empty(self, connection_info: Connection,
1978                                                        expected_client: Client):
1979        inline_processor = expected_client.obj_for_inline_processing
1980
1981        if not connection_info.has_inline_processor:
1982            return False
1983
1984        try:
1985            inline_processor.on__output_buffers_are_empty()
1986
1987            if inline_processor.output_messages:
1988                while inline_processor.output_messages:
1989                    another_message = inline_processor.output_messages.popleft()
1990                    if not connection_info.this_is_raw_connection:
1991                        connection_info.output_to_client.put(
1992                            len(another_message).to_bytes(self.message_size_len, 'little'))
1993                    connection_info.output_to_client.put(another_message)
1994                self._output_check_sockets.add(connection_info.conn.result)
1995                if connection_info.output_to_client.get_data_full_size() >= 65536:
1996                    # self._write_data_to_socket(connection_info.conn.result)
1997                    self._write_data_to_socket(connection_info)
1998                return True
1999        except:
2000            self.remove_client(expected_client.id)
2001            exc = sys.exc_info()
2002            exception = exc
2003            error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0]))
2004            formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2])
2005            exception = exception[:2] + (formatted_traceback,)
2006            trace_str = ''.join(exception[2])
2007            result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str)
2008            if __debug__: self._log(
2009                'EXCEPTION: INLINE PROCESSOR: ON OUTPUT BUFFERS ARE EMPTY: {}'.format(result_string))
2010
2011        return False
2012
2013    def _inline_processor__on__connection_lost(self, connection_info: Connection,
2014                                               expected_client: Client):
2015        inline_processor = expected_client.obj_for_inline_processing
2016
2017        if not connection_info.has_inline_processor:
2018            return False
2019
2020        try:
2021            inline_processor.on__connection_lost()
2022        except:
2023            exc = sys.exc_info()
2024            exception = exc
2025            error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0]))
2026            formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2])
2027            exception = exception[:2] + (formatted_traceback,)
2028            trace_str = ''.join(exception[2])
2029            result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str)
2030            if __debug__: self._log('EXCEPTION: INLINE PROCESSOR: ON CONNECTION LOST: {}'.format(result_string))
2031        self.remove_client(expected_client.id)
2032
2033    def _inline_processor__on__connection_lost_by_connection_id(self, connection_id):
2034        connection_info = self._connections[connection_id]
2035        expected_client_info = connection_info.connected_expected_client
2036        if (expected_client_info is not None) and connection_info.has_inline_processor:
2037            self._inline_processor__on__connection_lost(connection_info, expected_client_info)
2038            self._io_iteration_result.clients_with_disconnected_connection.remove(
2039                expected_client_info.id)
2040
2041    def _unlink_good_af_unix_sockets(self):
2042        if 'posix' == os.name:
2043            for gate_connection_settings in self.gates_connections_settings:
2044                if gate_connection_settings.socket_family == socket.AF_UNIX:
2045                    try:
2046                        os.unlink(gate_connection_settings.socket_address)
2047                    except:
2048                        if __debug__: self._log('EXCEPTION: SERVER END: TRYING TO UNLINK GOOD AF_UNIX GATE: {}'.format(
2049                            gate_connection_settings.socket_address))
2050                        raise
2051
2052    def _check_for_initial_af_unix_socket_unlink(self, connection_settings: ConnectionSettings):
2053        if 'posix' == os.name:
2054            if connection_settings.socket_family == socket.AF_UNIX:
2055                if os.path.exists(connection_settings.socket_address):
2056                    if __debug__: self._log('EXCEPTION: INITIATION: GATE: AF_UNIX SOCKET IS ALREADY EXIST: {}'.format(
2057                        connection_settings.socket_address))
2058
2059class ThereAreNoGateConections(Exception):
2060    pass
2061
2062class NotEnoughGateConnections(Exception):
2063    pass
2064
2065@contextmanager
2066def asock_io_core_connect(asock_io_core_obj: ASockIOCore, should_have_gate_connections: bool=False, backlog: int=1,
2067                          should_have_all_desired_gate_connections: bool=False):
2068    try:
2069        desired_amount_of_gate_connections = len(asock_io_core_obj.gates_connections_settings)
2070        gate_connections_num = asock_io_core_obj.listen(backlog)
2071        if should_have_gate_connections and (not gate_connections_num):
2072            error_text = 'ERROR: CONTEXTMANAGER: BASIC INITIATION: THERE IS NO GOOD GATE CONNECTIONS!'
2073            asock_io_core_obj._log(error_text)
2074            raise ThereAreNoGateConections(error_text)
2075        else:
2076            if should_have_all_desired_gate_connections:
2077                if desired_amount_of_gate_connections != gate_connections_num:
2078                    error_text = 'ERROR: CONTEXTMANAGER: BASIC INITIATION: NOT ENOUGH GOOD GATE CONNECTIONS!'
2079                    asock_io_core_obj._log(error_text)
2080                    raise NotEnoughGateConnections(error_text)
2081            print('THERE ARE CREATED {} GOOD GATE CONNECTIONS'.format(gate_connections_num))
2082        yield asock_io_core_obj
2083    except:
2084        raise
2085    finally:
2086        asock_io_core_obj.io_iteration()
2087        asock_io_core_obj.destroy()
2088        asock_io_core_obj.io_iteration()
2089        if __debug__: print('RECV BUFF SIZES: {}'.format(str(asock_io_core_obj.recv_buff_sizes)[:150]))
2090        if __debug__: print('RECV SIZES: {}'.format(str(asock_io_core_obj.recv_sizes)[:150]))
2091
2092
2093class CheckIsRawConnection:
2094    def __call__(self, asock_io_core: ASockIOCore, connection_info: Connection)->bool:
2095        """
2096        :param asock_io_core:
2097        :param connection_info:
2098        :return: "True" if it is RAW connection for Unknow Client. "False" otherwise.
2099        """
2100        result = False
2101        try:
2102            if connection_info.conn.result.family in {socket.AF_INET, socket.AF_INET6}:
2103                if connection_info.addr.result[0] not in asock_io_core.set_of_gate_addresses:
2104                    # If connected not from local IP address
2105                    result = True
2106        except:
2107            pass
2108        return result
class IoIterationResult:
 73class IoIterationResult:
 74    """
 75    ([1] подключившиеся ожидаемые клиенты (ОКл); [2] ОКл сокет которых был отключен по причине ошибки
 76    (сами ОКл еще небыли удалены - удаление нужно инициировать явно); [3] ОКл имеет очередь непрочитанных принятых
 77    сообщений; [4] размер очереди неотправленных сообщений ОКл меньше порогового, а значит в нее можно записывать
 78    новые запросы (не уверен пока в надобности этого параметра. Скорее всего он не нужен: актор в большинстве
 79    случаев блокируется при вызове IO-операции; кроме случаев когда был задействован ассинхронный интерфейс,
 80    при котором актор отправляет запрос не требуя ответа об успешном окончании операции (без какого-либо контроля
 81    успешности, или же с ручным контролем путем вызова спец-метода, который-бы и проводил проверку, или же
 82    считывание имеющихся результатов операций)))
 83    """
 84
 85    def __init__(self):
 86        self.newly_connected_expected_clients = set()
 87        self.newly_connected_unknown_clients = set()
 88        self.clients_with_disconnected_connection = set()
 89        self.clients_have_data_to_read = set()
 90        self.clients_with_empty_output_fifo = set()
 91
 92    def update(self, other):
 93        self.newly_connected_expected_clients.update(other.newly_connected_expected_clients)
 94        self.newly_connected_unknown_clients.update(other.newly_connected_unknown_clients)
 95        self.clients_with_disconnected_connection.update(
 96            other.clients_with_disconnected_connection)
 97        self.clients_have_data_to_read.update(other.clients_have_data_to_read)
 98        self.clients_with_empty_output_fifo.update(other.clients_with_empty_output_fifo)
 99
100    def remove(self, item):
101        if item in self.newly_connected_expected_clients:
102            self.newly_connected_expected_clients.remove(item)
103        if item in self.newly_connected_unknown_clients:
104            self.newly_connected_unknown_clients.remove(item)
105        if item in self.clients_with_disconnected_connection:
106            self.clients_with_disconnected_connection.remove(item)
107        if item in self.clients_have_data_to_read:
108            self.clients_have_data_to_read.remove(item)
109        if item in self.clients_with_empty_output_fifo:
110            self.clients_with_empty_output_fifo.remove(item)
111
112    def clear(self):
113        self.newly_connected_expected_clients.clear()
114        self.newly_connected_unknown_clients.clear()
115        self.clients_with_disconnected_connection.clear()
116        self.clients_have_data_to_read.clear()
117        self.clients_with_empty_output_fifo.clear()

([1] подключившиеся ожидаемые клиенты (ОКл); [2] ОКл сокет которых был отключен по причине ошибки (сами ОКл еще небыли удалены - удаление нужно инициировать явно); [3] ОКл имеет очередь непрочитанных принятых сообщений; [4] размер очереди неотправленных сообщений ОКл меньше порогового, а значит в нее можно записывать новые запросы (не уверен пока в надобности этого параметра. Скорее всего он не нужен: актор в большинстве случаев блокируется при вызове IO-операции; кроме случаев когда был задействован ассинхронный интерфейс, при котором актор отправляет запрос не требуя ответа об успешном окончании операции (без какого-либо контроля успешности, или же с ручным контролем путем вызова спец-метода, который-бы и проводил проверку, или же считывание имеющихся результатов операций)))

newly_connected_expected_clients
newly_connected_unknown_clients
clients_with_disconnected_connection
clients_have_data_to_read
clients_with_empty_output_fifo
def update(self, other):
92    def update(self, other):
93        self.newly_connected_expected_clients.update(other.newly_connected_expected_clients)
94        self.newly_connected_unknown_clients.update(other.newly_connected_unknown_clients)
95        self.clients_with_disconnected_connection.update(
96            other.clients_with_disconnected_connection)
97        self.clients_have_data_to_read.update(other.clients_have_data_to_read)
98        self.clients_with_empty_output_fifo.update(other.clients_with_empty_output_fifo)
def remove(self, item):
100    def remove(self, item):
101        if item in self.newly_connected_expected_clients:
102            self.newly_connected_expected_clients.remove(item)
103        if item in self.newly_connected_unknown_clients:
104            self.newly_connected_unknown_clients.remove(item)
105        if item in self.clients_with_disconnected_connection:
106            self.clients_with_disconnected_connection.remove(item)
107        if item in self.clients_have_data_to_read:
108            self.clients_have_data_to_read.remove(item)
109        if item in self.clients_with_empty_output_fifo:
110            self.clients_with_empty_output_fifo.remove(item)
def clear(self):
112    def clear(self):
113        self.newly_connected_expected_clients.clear()
114        self.newly_connected_unknown_clients.clear()
115        self.clients_with_disconnected_connection.clear()
116        self.clients_have_data_to_read.clear()
117        self.clients_with_empty_output_fifo.clear()
class ASockIOCoreMemoryManagement(cengal.io.asock_io.versions.v_0.base.IOCoreMemoryManagement):
120class ASockIOCoreMemoryManagement(IOCoreMemoryManagement):
121    def __init__(self):
122        super(ASockIOCoreMemoryManagement, self).__init__()
123
124        self.socket_read_fixed_buffer_size = ResultExistence(True,
125                                                             1024 ** 2)  # 1024**2 is the fastest fixed read buffer.
126
127    def link_to(self, parent):
128        super(ASockIOCoreMemoryManagement, self).link_to(parent)
129        try:
130            self.socket_read_fixed_buffer_size = parent.socket_read_fixed_buffer_size
131        except AttributeError:
132            pass
socket_read_fixed_buffer_size
Inherited Members
cengal.io.asock_io.versions.v_0.base.IOCoreMemoryManagement
global__data_size_limit
global_in__data_size_limit
global_in__data_full_size
global_in__deletable_data_full_size
global_out__data_size_limit
global_out__data_full_size
global_out__deletable_data_full_size
class Connection:
135class Connection:
136    def __init__(self,
137                 connection_id=None,
138                 connection__conn_addr: tuple = None,
139                 global_memory_management: ASockIOCoreMemoryManagement = None
140                 ):
141        """
142        
143        :param connection_id: ID for this connection
144        :param connection__conn_addr: tuple(conn, addr) where conn is a socket, addr is an address
145        :param global_memory_management: global memory management obj
146        """
147        self.id = connection_id
148        if connection__conn_addr is None:
149            self.conn = ResultExistence(False, None)
150            self.addr = ResultExistence(False, None)
151        else:
152            self.conn = ResultExistence(True, connection__conn_addr[0])
153            self.addr = ResultExistence(True, connection__conn_addr[1])
154
155        self.addr_info = None
156        self.host_names = None
157
158        self.recv_buff_size_computer = RecvBuffSizeComputer()
159        self.recv_buff_size = 0
160        self.calc_new_recv_buff_size(0)
161
162        self.should_be_closed = False  # socket should be closed immediately. For example because of IO error.
163        self.ready_to_be_closed = False  # socket should be closed, after all messages had been sent to client.
164        self.ready_for_deletion = False  # connection should be deleted immediately. For example because of unexpected
165        #   keyword.
166
167        self.keyword = None
168
169        self.raw_input_from_client = DynamicListOfPiecesDequeWithLengthControl(
170            external_data_length=global_memory_management.global_in__data_full_size)
171        self.current_message_length = None  # length of current input message (or None, if size waw not read yet)
172        self.input_from_client = FIFODequeWithLengthControl(
173            external_data_full_size=global_memory_management.global_in__data_full_size)
174        self.current_memoryview_output = None
175        self.current_memoryview_input = None
176        self.output_to_client = FIFODequeWithLengthControl(
177            external_data_full_size=global_memory_management.global_out__data_full_size)
178
179        self.this_is_raw_connection = False
180
181        self.connected_expected_client_id = None
182        self.connected_expected_client = None
183        self.has_inline_processor = False
184
185    def calc_new_recv_buff_size(self, last_recv_amount):
186        self.recv_buff_size = self.recv_buff_size_computer.calc_new_recv_buff_size(last_recv_amount)
187
188    def remove(self):
189        self.raw_input_from_client.remove()
190        self.input_from_client.remove()
191        self.output_to_client.remove()
Connection( connection_id=None, connection__conn_addr: tuple = None, global_memory_management: ASockIOCoreMemoryManagement = None)
136    def __init__(self,
137                 connection_id=None,
138                 connection__conn_addr: tuple = None,
139                 global_memory_management: ASockIOCoreMemoryManagement = None
140                 ):
141        """
142        
143        :param connection_id: ID for this connection
144        :param connection__conn_addr: tuple(conn, addr) where conn is a socket, addr is an address
145        :param global_memory_management: global memory management obj
146        """
147        self.id = connection_id
148        if connection__conn_addr is None:
149            self.conn = ResultExistence(False, None)
150            self.addr = ResultExistence(False, None)
151        else:
152            self.conn = ResultExistence(True, connection__conn_addr[0])
153            self.addr = ResultExistence(True, connection__conn_addr[1])
154
155        self.addr_info = None
156        self.host_names = None
157
158        self.recv_buff_size_computer = RecvBuffSizeComputer()
159        self.recv_buff_size = 0
160        self.calc_new_recv_buff_size(0)
161
162        self.should_be_closed = False  # socket should be closed immediately. For example because of IO error.
163        self.ready_to_be_closed = False  # socket should be closed, after all messages had been sent to client.
164        self.ready_for_deletion = False  # connection should be deleted immediately. For example because of unexpected
165        #   keyword.
166
167        self.keyword = None
168
169        self.raw_input_from_client = DynamicListOfPiecesDequeWithLengthControl(
170            external_data_length=global_memory_management.global_in__data_full_size)
171        self.current_message_length = None  # length of current input message (or None, if size waw not read yet)
172        self.input_from_client = FIFODequeWithLengthControl(
173            external_data_full_size=global_memory_management.global_in__data_full_size)
174        self.current_memoryview_output = None
175        self.current_memoryview_input = None
176        self.output_to_client = FIFODequeWithLengthControl(
177            external_data_full_size=global_memory_management.global_out__data_full_size)
178
179        self.this_is_raw_connection = False
180
181        self.connected_expected_client_id = None
182        self.connected_expected_client = None
183        self.has_inline_processor = False

:param connection_id: ID for this connection :param connection__conn_addr: tuple(conn, addr) where conn is a socket, addr is an address :param global_memory_management: global memory management obj

id
addr_info
host_names
recv_buff_size_computer
recv_buff_size
should_be_closed
ready_to_be_closed
ready_for_deletion
keyword
raw_input_from_client
current_message_length
input_from_client
current_memoryview_output
current_memoryview_input
output_to_client
this_is_raw_connection
connected_expected_client_id
connected_expected_client
has_inline_processor
def calc_new_recv_buff_size(self, last_recv_amount):
185    def calc_new_recv_buff_size(self, last_recv_amount):
186        self.recv_buff_size = self.recv_buff_size_computer.calc_new_recv_buff_size(last_recv_amount)
def remove(self):
188    def remove(self):
189        self.raw_input_from_client.remove()
190        self.input_from_client.remove()
191        self.output_to_client.remove()
class InlineProcessor:
194class InlineProcessor:
195    __slots__ = ('client_id', 'keyword', 'socket_family', 'socket_type', 'socket_proto', 'addr_info', 'host_names',
196                 'is_in_raw_mode', '__set__is_in_raw_mode', '__set__mark_socket_as_should_be_closed_immediately',
197                 '__set__mark_socket_as_ready_to_be_closed', '__external_parameters_set_trigger', 'output_messages',
198                 '__hold__client_id')
199
200    def __init__(self, client_id=None, keyword: bytes = None, socket_family=None, socket_type=None, socket_proto=None,
201                 addr_info=None, host_names=None, external_parameters_set_trigger: Set = None):
202        """
203
204        :param keyword: client keyword. You may check for a known keywords to act appropriately
205        :param socket_family:
206        :param socket_type:
207        :param socket_proto:
208        :param addr_info: result of socket.getaddrinfo() call
209        :param host_names: result of socket.gethostbyaddr() call
210        """
211        self.client_id = client_id
212        self.keyword = keyword
213        self.socket_family = socket_family
214        self.socket_type = socket_type
215        self.socket_proto = socket_proto
216        self.addr_info = addr_info
217        self.host_names = host_names
218        self.is_in_raw_mode = None
219        self.__hold__client_id = client_id
220        self.__set__is_in_raw_mode = False
221        self.__set__mark_socket_as_should_be_closed_immediately = False
222        self.__set__mark_socket_as_ready_to_be_closed = False
223        self.__external_parameters_set_trigger = external_parameters_set_trigger
224
225        # self.output_messages = FIFODequeWithLengthControl()
226        self.output_messages = deque()
227        # self.output_messages = list()
228
229    def on__data_received(self, data: bytes):
230        """
231        Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data
232        Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will 
233        be logged
234        :param data: piece of input data if connection is in RAW-mode and full message otherwise.
235        """
236        pass
237
238    def on__output_buffers_are_empty(self):
239        """
240        Will be called immediately when all output data was send.
241        Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data
242        Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will 
243        be logged
244        """
245        pass
246
247    def on__connection_lost(self):
248        """
249        Will be called after connection was closed. Current Inline Processor object will be destroyed after this call.
250        Situation with unhandled exception will be logged.
251        """
252        pass
253
254    def set__is_in_raw_mode(self, is_in_raw_mode: bool):
255        self.__set__is_in_raw_mode = is_in_raw_mode
256        self.__external_parameters_set_trigger.add(self.__hold__client_id)
257
258    def mark__socket_as_should_be_closed_immediately(self, mark_socket_as: bool):
259        self.__set__mark_socket_as_should_be_closed_immediately = mark_socket_as
260        self.__external_parameters_set_trigger.add(self.__hold__client_id)
261
262    def mark__socket_as_ready_to_be_closed(self, mark_socket_as: bool):
263        self.__set__mark_socket_as_ready_to_be_closed = mark_socket_as
264        self.__external_parameters_set_trigger.add(self.__hold__client_id)
InlineProcessor( client_id=None, keyword: bytes = None, socket_family=None, socket_type=None, socket_proto=None, addr_info=None, host_names=None, external_parameters_set_trigger: typing.Set = None)
200    def __init__(self, client_id=None, keyword: bytes = None, socket_family=None, socket_type=None, socket_proto=None,
201                 addr_info=None, host_names=None, external_parameters_set_trigger: Set = None):
202        """
203
204        :param keyword: client keyword. You may check for a known keywords to act appropriately
205        :param socket_family:
206        :param socket_type:
207        :param socket_proto:
208        :param addr_info: result of socket.getaddrinfo() call
209        :param host_names: result of socket.gethostbyaddr() call
210        """
211        self.client_id = client_id
212        self.keyword = keyword
213        self.socket_family = socket_family
214        self.socket_type = socket_type
215        self.socket_proto = socket_proto
216        self.addr_info = addr_info
217        self.host_names = host_names
218        self.is_in_raw_mode = None
219        self.__hold__client_id = client_id
220        self.__set__is_in_raw_mode = False
221        self.__set__mark_socket_as_should_be_closed_immediately = False
222        self.__set__mark_socket_as_ready_to_be_closed = False
223        self.__external_parameters_set_trigger = external_parameters_set_trigger
224
225        # self.output_messages = FIFODequeWithLengthControl()
226        self.output_messages = deque()
227        # self.output_messages = list()

:param keyword: client keyword. You may check for a known keywords to act appropriately :param socket_family: :param socket_type: :param socket_proto: :param addr_info: result of socket.getaddrinfo() call :param host_names: result of socket.gethostbyaddr() call

client_id
keyword
socket_family
socket_type
socket_proto
addr_info
host_names
is_in_raw_mode
output_messages
def on__data_received(self, data: bytes):
229    def on__data_received(self, data: bytes):
230        """
231        Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data
232        Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will 
233        be logged
234        :param data: piece of input data if connection is in RAW-mode and full message otherwise.
235        """
236        pass

Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will be logged :param data: piece of input data if connection is in RAW-mode and full message otherwise.

def on__output_buffers_are_empty(self):
238    def on__output_buffers_are_empty(self):
239        """
240        Will be called immediately when all output data was send.
241        Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data
242        Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will 
243        be logged
244        """
245        pass

Will be called immediately when all output data was send. Use self.output_messages (self.output_messages.append(out_message)) to store output messages or raw output data Any unhandled exception will lead to force destroying of current Inline Processor object. Also situation will be logged

def on__connection_lost(self):
247    def on__connection_lost(self):
248        """
249        Will be called after connection was closed. Current Inline Processor object will be destroyed after this call.
250        Situation with unhandled exception will be logged.
251        """
252        pass

Will be called after connection was closed. Current Inline Processor object will be destroyed after this call. Situation with unhandled exception will be logged.

def set__is_in_raw_mode(self, is_in_raw_mode: bool):
254    def set__is_in_raw_mode(self, is_in_raw_mode: bool):
255        self.__set__is_in_raw_mode = is_in_raw_mode
256        self.__external_parameters_set_trigger.add(self.__hold__client_id)
def mark__socket_as_should_be_closed_immediately(self, mark_socket_as: bool):
258    def mark__socket_as_should_be_closed_immediately(self, mark_socket_as: bool):
259        self.__set__mark_socket_as_should_be_closed_immediately = mark_socket_as
260        self.__external_parameters_set_trigger.add(self.__hold__client_id)
def mark__socket_as_ready_to_be_closed(self, mark_socket_as: bool):
262    def mark__socket_as_ready_to_be_closed(self, mark_socket_as: bool):
263        self.__set__mark_socket_as_ready_to_be_closed = mark_socket_as
264        self.__external_parameters_set_trigger.add(self.__hold__client_id)
class Client:
267class Client:
268    def __init__(self, connection_settings: ConnectionSettings, client_id=None, client_tcp_id=None):
269        """
270
271        :param client_id: ID of the expected client
272        :param client_tcp_id: ID of the connection
273        :param connection_settings: useful ConnectionSettings parameters are {direction_role, keyword} - for a client,
274            and all - for the super server.
275        """
276        self.id = client_id
277        self.connection_id = client_tcp_id
278        self.__connection = None  # type: Optional[Connection]
279        self.connection_settings = connection_settings
280        self.connection_settings.check()
281
282        self.will_use_raw_client_connection = False
283        self.will_use_raw_connection_without_handshake = False
284        self.this_is_unknown_client = False
285
286        self.obj_for_inline_processing = None
287
288    def get_connection(self)->Connection:
289        return self.__connection
Client( connection_settings: cengal.io.asock_io.versions.v_0.base.ConnectionSettings, client_id=None, client_tcp_id=None)
268    def __init__(self, connection_settings: ConnectionSettings, client_id=None, client_tcp_id=None):
269        """
270
271        :param client_id: ID of the expected client
272        :param client_tcp_id: ID of the connection
273        :param connection_settings: useful ConnectionSettings parameters are {direction_role, keyword} - for a client,
274            and all - for the super server.
275        """
276        self.id = client_id
277        self.connection_id = client_tcp_id
278        self.__connection = None  # type: Optional[Connection]
279        self.connection_settings = connection_settings
280        self.connection_settings.check()
281
282        self.will_use_raw_client_connection = False
283        self.will_use_raw_connection_without_handshake = False
284        self.this_is_unknown_client = False
285
286        self.obj_for_inline_processing = None

:param client_id: ID of the expected client :param client_tcp_id: ID of the connection :param connection_settings: useful ConnectionSettings parameters are {direction_role, keyword} - for a client, and all - for the super server.

id
connection_id
connection_settings
will_use_raw_client_connection
will_use_raw_connection_without_handshake
this_is_unknown_client
obj_for_inline_processing
def get_connection(self) -> Connection:
288    def get_connection(self)->Connection:
289        return self.__connection
class ASockIOCore(ASockIOCoreMemoryManagement):
 292class ASockIOCore(ASockIOCoreMemoryManagement):
 293    def __init__(self, gates_connections_settings: Set[ConnectionSettings]):
 294        """
 295        Port should not be open to a external world!
 296        :param gates_connections_settings: set() of ConnectionSettings()
 297        :return:
 298        """
 299        super(ASockIOCore, self).__init__()
 300
 301        if os.name != 'nt':
 302            self.po = select.poll()
 303        self.last_all_sockets = set()  # type: Set[int]
 304        self.socket_by_fd = dict()  # type: Dict[int, socket.socket]
 305
 306        self.check_sockets_sum_time = 0.0
 307        self.check_sockets_qnt = 0
 308        self.check_sockets_max_time = 0.0
 309
 310        self.gates_connections_settings = gates_connections_settings
 311        if not self.gates_connections_settings:
 312            self.gates_connections_settings = set()
 313            # raise Exception('gates_connections_settings should be provided!')
 314        for gates_connections_settings in self.gates_connections_settings:
 315            gates_connections_settings.check()
 316        self.faulty_connection_settings = set()
 317        self._connection_settings_by_gate_conn = dict()
 318
 319        self.set_of_gate_addresses = set()
 320        self._gate = set()
 321        self.reuse_gate_addr = False
 322        self.reuse_gate_port = False
 323
 324        self.message_size_len = MESSAGE_SIZE_LEN
 325        self.server_answer__keyword_accepted = SERVER_ANSWER__KEYWORD_ACCEPTED
 326
 327        self._connections = dict()  # key: ID; data: Connection()
 328        self._connection_by_conn = dict()  # key: conn; data: ID
 329        self._connections_id_gen = IDGenerator()
 330
 331        self._connections_marked_as_ready_to_be_closed = set()
 332        self._connections_marked_to_be_closed_immediately = set()
 333        self._connections_marked_as_ready_to_be_deleted = set()
 334
 335        self._unconfirmed_clients = set()
 336
 337        self._we_have_connections_for_select = False
 338        self._input_check_sockets = set()
 339        self._output_check_sockets = set()
 340        self._exception_check_sockets = set()
 341
 342        # ID (GUID) и другая информация клиентов, подключение которых явно ожидается
 343        self._expected_clients = dict()  # key: ID; data: Client()
 344        self._expected_clients_id_gen = IDGenerator()
 345        self._keywords_for_expected_clients = dict()  # key: keyword; data: info
 346        self._conns_of_expected_clients = dict()  # key: conn; data expected_client_ID
 347
 348        self.unexpected_clients_are_allowed = True
 349
 350        # Список неопознанных и неожидаемых клиентов. Если клиент выдал свой GUID и позже кто-то добавил этот GUID в
 351        # список ожидаемых клиентов - данный клиент будет автоматически подхвачен.
 352        self._unexpected_clients = dict()  # key: ID; data: Client()
 353        self._unexpected_clients_id_gen = IDGenerator()
 354        self._keywords_of_unexpected_clients = dict()
 355        self._conns_of_unexpected_clients = dict()
 356
 357        self._io_iteration_result = IoIterationResult()
 358
 359        self.raw_checker_for_new_incoming_connections = CheckIsRawConnection()
 360        self.need_to_auto_check_incoming_raw_connection = False
 361        self.unknown_clients_are_allowed = False
 362        self._unknown_clients_keyword_gen = IDGenerator(GeneratorType.guid_string)
 363        self.prefix_for_unknown_client_keywords = b'UNKNOWN CLIENT: '
 364
 365        self.echo_log = False
 366        self._internal_log = deque()
 367
 368        self.recv_sizes = deque()
 369        self.recv_buff_sizes = deque()
 370
 371        self.should_get_client_addr_info_on_connection = True
 372
 373        self.use_nodelay_inet = False
 374        self.use_speed_optimized_socket_read = False
 375
 376        self.show_inform_about_accept_stop_because_of_all_buffers_size_limit = \
 377            FrontTriggerableVariable(FrontTriggerableVariableType.equal, True)
 378        self.show_inform_about_read_stop_because_of_in_buffer_size_limit = \
 379            FrontTriggerableVariable(FrontTriggerableVariableType.equal, True)
 380        self.show_inform_about_work_stop_because_of_out_buffer_size_limit = \
 381            FrontTriggerableVariable(FrontTriggerableVariableType.equal, True)
 382
 383        self.class_for_unknown_clients_inline_processing = None
 384
 385        self._clients_with_inline_processors_that_need_to_apply_parameters = set()
 386
 387    @staticmethod
 388    def check_sockets_select(read: Set[int], write: Set[int], error: Set[int],
 389                             timeout: float)->Tuple[Set[int], Set[int], Set[int]]:
 390        all_sockets = read | write | error
 391        if all_sockets:
 392            return select.select(read,
 393                                 write,
 394                                 error,
 395                                 timeout)
 396        else:
 397            return set(), set(), set()
 398
 399    def check_sockets_poll(self, read: Set[int], write: Set[int], error: Set[int],
 400                           timeout: float)->Tuple[Set[int], Set[int], Set[int]]:
 401        read_events = select.POLLIN | select.POLLPRI
 402        write_events = select.POLLOUT
 403        except_events = select.POLLERR | select.POLLHUP | select.POLLNVAL
 404        if hasattr(select, 'POLLRDHUP'):
 405            except_events |= select.POLLRDHUP
 406        readable_events = {select.POLLIN, select.POLLPRI}
 407        writable_events = {select.POLLOUT}
 408        exceptional_events = {select.POLLERR, select.POLLHUP, select.POLLNVAL}
 409        if hasattr(select, 'POLLRDHUP'):
 410            exceptional_events.add(select.POLLRDHUP)
 411        all_events_set = readable_events | writable_events | exceptional_events
 412
 413        timeout = int(timeout * 1000)
 414
 415        # print('>>> POLL {}: last_all_sockets: {}'.format(time.perf_counter(), self.last_all_sockets))
 416        all_sockets = read | write | error
 417        # print('>>> POLL {}: all_sockets: {}'.format(time.perf_counter(), all_sockets))
 418        new_sockets = all_sockets - self.last_all_sockets
 419        # print('>>> POLL {}: new_sockets: {}'.format(time.perf_counter(), new_sockets))
 420        still_sockets = all_sockets & self.last_all_sockets
 421        # print('>>> POLL {}: still_sockets: {}'.format(time.perf_counter(), still_sockets))
 422        deleted_sockets = self.last_all_sockets - all_sockets
 423        # print('>>> POLL {}: deleted_sockets: {}'.format(time.perf_counter(), deleted_sockets))
 424        self.last_all_sockets = all_sockets
 425
 426        for socket_fd in new_sockets:
 427            event_mask = 0
 428            if socket_fd in read:
 429                event_mask |= read_events
 430            if socket_fd in write:
 431                event_mask |= write_events
 432            if socket_fd in error:
 433                event_mask |= except_events
 434            # print('>>> POLL {}: new_socket: {}; event_mask: {}'.format(time.perf_counter(), socket_fd, event_mask))
 435            self.po.register(socket_fd, event_mask)
 436
 437        for socket_fd in still_sockets:
 438            event_mask = 0
 439            if socket_fd in read:
 440                event_mask |= read_events
 441            if socket_fd in write:
 442                event_mask |= write_events
 443            if socket_fd in error:
 444                event_mask |= except_events
 445            # print('>>> POLL {}: still_socket: {}; event_mask: {}'.format(time.perf_counter(), socket_fd, event_mask))
 446            self.po.modify(socket_fd, event_mask)
 447
 448        for socket_fd in deleted_sockets:
 449            # print('>>> POLL {}: deleted_socket: {}'.format(time.perf_counter(), socket_fd))
 450            self.po.unregister(socket_fd)
 451
 452        poll_result = self.po.poll(timeout)
 453        # print('>>> POLL {}: result: {}'.format(time.perf_counter(), poll_result))
 454        # sys.stdout.flush()
 455
 456        readable = set()
 457        writable = set()
 458        exceptional = set()
 459        for socket_fd, event_mask in poll_result:
 460            socket_events_set = set()
 461            for another_event in all_events_set:
 462                if event_mask & another_event:
 463                    socket_events_set.add(another_event)
 464
 465            if socket_events_set & readable_events:
 466                readable.add(socket_fd)
 467            if socket_events_set & writable_events:
 468                writable.add(socket_fd)
 469            if socket_events_set & exceptional_events:
 470                exceptional.add(socket_fd)
 471
 472        return readable, writable, exceptional
 473
 474    def check_sockets(self, read: Set[socket.socket], write: Set[socket.socket], error: Set[socket.socket],
 475                      timeout: float)->Tuple[Set[socket.socket], Set[socket.socket], Set[socket.socket]]:
 476        all_sockets = read | write | error
 477        if all_sockets:
 478            read_fd = set()
 479            write_fd = set()
 480            error_fd = set()
 481            for conn in read:
 482                read_fd.add(conn.fileno())
 483            for conn in write:
 484                write_fd.add(conn.fileno())
 485            for conn in error:
 486                error_fd.add(conn.fileno())
 487
 488            check_sockets = self.check_sockets_select
 489            if os.name != 'nt':
 490                check_sockets = self.check_sockets_poll
 491
 492            readable_fd, writable_fd, exceptional_fd = check_sockets(read_fd,
 493                                                                     write_fd,
 494                                                                     error_fd,
 495                                                                     timeout)
 496            readable = set()
 497            writable = set()
 498            exceptional = set()
 499            for fd in readable_fd:
 500                readable.add(self.socket_by_fd[fd])
 501            for fd in writable_fd:
 502                writable.add(self.socket_by_fd[fd])
 503            for fd in exceptional_fd:
 504                exceptional.add(self.socket_by_fd[fd])
 505            return readable, writable, exceptional
 506        else:
 507            return set(), set(), set()
 508
 509    def gate_io_iteration(self, timeout=0.0):
 510        result = self._io_iteration_result
 511        if self._gate:
 512            readable, writable, exceptional = self.check_sockets_select(self._gate,
 513                                                                        set(),
 514                                                                        set(),
 515                                                                        timeout)
 516
 517            # Handle inputs
 518            for s in readable:
 519                self._read_data_from_socket(s)
 520
 521        self._io_iteration_result = IoIterationResult()
 522        return result
 523
 524    # @profile
 525    def io_iteration(self, timeout=0.0):
 526        """
 527
 528        :param timeout: timeout in seconds
 529        :return:
 530        """
 531        result = self._io_iteration_result
 532
 533        if self._we_have_connections_for_select:
 534            # need_to_process = False
 535            # all_sockets = self._input_check_sockets | self._output_check_sockets | self._exception_check_sockets
 536            # if not (all_sockets - self._gate):
 537            #     timeout = 0.01
 538
 539            need_to_repeat = True
 540
 541            while need_to_repeat:
 542                output_check_sockets = set()
 543
 544                # Is need to check writable sockets
 545                need_to_check_writable_sockets = False
 546                for s in self._output_check_sockets:
 547                    curr_client_info = self._connections[self._connection_by_conn[s]]
 548                    if curr_client_info.output_to_client.size():
 549                        need_to_check_writable_sockets = True
 550                        break
 551
 552                if need_to_check_writable_sockets:
 553                    output_check_sockets = self._output_check_sockets
 554
 555                # print('>>> POLL {}: ri: {}, wi: {}, ei: {}'.format(time.perf_counter(),
 556                #                                                    len(self._input_check_sockets),
 557                #                                                    len(self._output_check_sockets),
 558                #                                                    len(self._exception_check_sockets)))
 559                # sys.stdout.flush()
 560                check_sockets_start_time = time.perf_counter()
 561                readable, writable, exceptional = self.check_sockets(self._input_check_sockets,
 562                                                                     output_check_sockets,
 563                                                                     self._exception_check_sockets,
 564                                                                     timeout)
 565                check_sockets_finish_time = time.perf_counter()
 566                check_sockets_delta_time = check_sockets_finish_time - check_sockets_start_time
 567                self.check_sockets_sum_time += check_sockets_delta_time
 568                self.check_sockets_qnt += 1
 569                if self.check_sockets_max_time < check_sockets_delta_time:
 570                    self.check_sockets_max_time = check_sockets_delta_time
 571                check_socket_average_time = self.check_sockets_sum_time / self.check_sockets_qnt
 572                # print('>>> CHECK SOCKET: DELTA {}: AVG: {}; SUM: {}; MAX: {}'.format(
 573                #     check_sockets_delta_time,
 574                #     check_socket_average_time,
 575                #     self.check_sockets_sum_time,
 576                #     self.check_sockets_max_time
 577                # ))
 578                # print('>>> POLL {}: ro: {}, wo: {}, eo: {}'.format(time.perf_counter(),
 579                #                                                    len(readable),
 580                #                                                    len(writable),
 581                #                                                    len(exceptional)))
 582                # sys.stdout.flush()
 583
 584                read_is_forbidden = True
 585                if (self.global_in__data_full_size.result - self.global_in__deletable_data_full_size.result) \
 586                        <= self.global_in__data_size_limit.result:
 587                    read_is_forbidden = False
 588
 589                    # Handle inputs
 590                    for s in readable:
 591                        read_result = self._read_data_from_socket(s)
 592                        if read_result:
 593                            if s in self._unconfirmed_clients:
 594                                self._process_client_keyword(s)
 595                                self._check_is_client_have_data_to_read_in_fifo(s)
 596                            else:
 597                                self._client_have_data_to_read_in_fifo(s)
 598
 599                if __debug__:
 600                    read_is_forbidden_test = self.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger(
 601                        read_is_forbidden)
 602                    if read_is_forbidden_test is not None:
 603                        if read_is_forbidden_test:
 604                            print('Read is suppressed until data will be processed.')
 605                        else:
 606                            print('Read is allowed: data is processed.')
 607
 608                # Handle outputs
 609                for s in writable:
 610                    curr_client_info = self._connections[self._connection_by_conn[s]]
 611                    self._write_data_to_socket(curr_client_info)
 612                    # self._write_data_to_socket(s)
 613
 614                # Handle "exceptional conditions"
 615                for s in exceptional:
 616                    self._handle_connection_error(s)
 617
 618                # Set parameters for inline processors
 619                if self._clients_with_inline_processors_that_need_to_apply_parameters:
 620                    for ec_id in self._clients_with_inline_processors_that_need_to_apply_parameters:
 621                        expected_client_info = self._expected_clients[ec_id]
 622                        connection_info = expected_client_info._Client__connection
 623                        self._inline_processor__apply_parameters(connection_info, expected_client_info)
 624                    self._clients_with_inline_processors_that_need_to_apply_parameters.clear()
 625
 626                # Close sockets
 627                if self._connections_marked_to_be_closed_immediately:
 628                    sockets_should_be_closed_immediately = self._connections_marked_to_be_closed_immediately
 629                    self._connections_marked_to_be_closed_immediately = set()
 630                    for closeable_socket in sockets_should_be_closed_immediately:
 631                        connection_id = self._connection_by_conn[closeable_socket]
 632                        # self.close_connection_by_conn(closeable_socket)
 633                        self.close_connection(connection_id)
 634                        self._inline_processor__on__connection_lost_by_connection_id(connection_id)
 635
 636                # Removing clients
 637                if self._connections_marked_as_ready_to_be_deleted:
 638                    clients_ready_to_be_deleted = self._connections_marked_as_ready_to_be_deleted
 639                    self._connections_marked_as_ready_to_be_deleted = set()
 640                    for faulty_socket in clients_ready_to_be_deleted:
 641                        self.remove_connection_by_conn(faulty_socket)
 642
 643                if (self.global_out__data_full_size.result - self.global_out__deletable_data_full_size.result) \
 644                        <= self.global_out__data_size_limit.result:
 645                    need_to_repeat = False
 646                else:
 647                    need_to_repeat = True
 648
 649                need_to_repeat = False
 650
 651                if __debug__:
 652                    need_to_repeat_show = self.show_inform_about_work_stop_because_of_out_buffer_size_limit.test_trigger(
 653                        need_to_repeat)
 654                    if need_to_repeat_show is not None:
 655                        if need_to_repeat_show:
 656                            print('Work is suppressed until data will be out.')
 657                        else:
 658                            print('Work is allowed: data is out.')
 659
 660        self._io_iteration_result = IoIterationResult()
 661        return result
 662
 663    def listen(self, backlog=1):
 664        # backlog = backlog or 1
 665
 666        new_connection_settings = set()
 667        for gate_connection_settings in self.gates_connections_settings:
 668            gate = None
 669            try:
 670                gate = socket.socket(gate_connection_settings.socket_family, gate_connection_settings.socket_type,
 671                                     gate_connection_settings.socket_protocol, gate_connection_settings.socket_fileno)
 672                self.socket_by_fd[gate.fileno()] = gate
 673                gate.setblocking(0)
 674                # gate.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 675            except (socket.error, OSError) as err:
 676                gate = None
 677                if __debug__: self._log('EXCEPTION: GATE: LISTEN: CREATE SOCKET: {}, {}'.format(
 678                    err.errno, err.strerror))
 679                continue
 680
 681            if self.reuse_gate_port:
 682                gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
 683
 684            try:
 685                self._check_for_initial_af_unix_socket_unlink(gate_connection_settings)
 686                gate.bind(gate_connection_settings.socket_address)
 687            except (socket.error, OSError) as err:
 688                del self.socket_by_fd[gate.fileno()]
 689                gate.close()
 690                gate = None
 691                if __debug__: self._log('EXCEPTION: GATE: BIND:"{}", {}, {}'.format(
 692                    gate_connection_settings.socket_address, err.errno, err.strerror))
 693                continue
 694            try:
 695                gate.listen(backlog)
 696            except (socket.error, OSError) as err:
 697                del self.socket_by_fd[gate.fileno()]
 698                gate.close()
 699                gate = None
 700                if __debug__: self._log('EXCEPTION: GATE: LISTEN:"{}", {}, {}'.format(
 701                    gate_connection_settings.socket_address, err.errno, err.strerror))
 702                continue
 703
 704            if self.reuse_gate_addr:
 705                gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 706
 707            self._input_check_sockets.add(gate)
 708            self._exception_check_sockets.add(gate)
 709
 710            if gate:
 711                self._gate.add(gate)
 712                if gate_connection_settings.socket_family in INET_TYPE_CONNECTIONS:
 713                    self.set_of_gate_addresses.add(gate_connection_settings.socket_address[0])
 714                elif socket.AF_UNIX == gate_connection_settings.socket_family:
 715                    self.set_of_gate_addresses.add(gate_connection_settings.socket_address)
 716                else:
 717                    self.set_of_gate_addresses.add(gate_connection_settings.socket_address)
 718                    self._log('WARNING: GATE: SAVE CONNECTION ADDRESS: UNKNOWN SOCKET FAMILY')
 719                self._connection_settings_by_gate_conn[gate] = gate_connection_settings
 720                self._we_have_connections_for_select = True
 721                new_connection_settings.add(gate_connection_settings)
 722            else:
 723                self.faulty_connection_settings.add(gate_connection_settings)
 724        self.gates_connections_settings = new_connection_settings
 725
 726        return len(self.gates_connections_settings)
 727
 728    def close_all_connections(self):
 729        if __debug__: self._log('CLOSE ALL CONNECTIONS:')
 730        clients_list = dict(self._connections)
 731        for connection_id, client_info in clients_list.items():
 732            self.close_connection(connection_id)
 733
 734    def remove_all_connections(self):
 735        clients_list = dict(self._connections)
 736        for connection_id, client_info in clients_list.items():
 737            self.remove_connection(connection_id)
 738
 739    def close(self):
 740        for gate in self._gate:
 741            del self.socket_by_fd[gate.fileno()]
 742            gate.close()
 743
 744            if gate in self._input_check_sockets:
 745                self._input_check_sockets.remove(gate)
 746            if gate in self._exception_check_sockets:
 747                self._exception_check_sockets.remove(gate)
 748
 749            if not self._input_check_sockets:
 750                self._we_have_connections_for_select = False
 751        self._unlink_good_af_unix_sockets()
 752
 753    def destroy(self):
 754        self.close()
 755        self.close_all_connections()
 756        self.remove_all_connections()
 757
 758    def add_client(self, expected_client_info: Client):
 759        """
 760        Добавляет новый expected client в список. Это может быть как клиент (который сам подключился или подключится в
 761        будущем), так и супер-сервер, попытка подключения к которому будет осуществлена тут же - на месте.
 762        При этом если произойдет какая-либо ошибка при подключении к супер-серверу - expected client не будет
 763        зарегистрирован. Однако client может быть создан. В случае ошибки он будет помечен для закрытия и удаления.
 764        Поэтому исключения нужно перехватывать, и после этого проводить как минимум один (как минимум завершающий -
 765        перед закрытием и уничтожением сервера) цикл обработки io_iteration().
 766        :param expected_client_info: link to Client()
 767        :return: expected_client_id
 768        """
 769        if (expected_client_info.connection_settings.keyword is None) \
 770                and (ConnectionDirectionRole.client == expected_client_info.connection_settings.direction_role):
 771            raise Exception('Keyword in Client.connection_settings should not be None for a Client connection!')
 772
 773        if expected_client_info.connection_settings.keyword in self._keywords_for_expected_clients:
 774            raise Exception('Expected Client with keyword "{}" is already registered!'.format(
 775                expected_client_info.connection_settings.keyword))
 776
 777        expected_client_info.id = self._expected_clients_id_gen()
 778
 779        if self.unexpected_clients_are_allowed:
 780            if expected_client_info.connection_settings.keyword in self._keywords_of_unexpected_clients:
 781                # клиент уже подключен
 782                unexpected_client_id = self._keywords_of_unexpected_clients[
 783                    expected_client_info.connection_settings.keyword]
 784                unexpected_client_info = self._unexpected_clients[unexpected_client_id]
 785                connection_info = expected_client_info._Client__connection
 786                if (
 787                    unexpected_client_info.connection_settings.direction_role == expected_client_info.connection_settings.direction_role) and \
 788                        (ConnectionDirectionRole.client == unexpected_client_info.connection_settings.direction_role):
 789                    # Произошел запрос на подключение к клиенту, и клиент с таким же ключевым словом уже
 790                    # подключен (с сервером этого быть не должно, и может произойти только при неверном ручном изменении
 791                    # внутренних данных объекта класса ASockIOCore). Необходимо переиспользовать уже имеющееся
 792                    # подключение.
 793                    # В случае если все же тут оказался соккет подключенный к супер-серверу - он будет автоматически
 794                    # отключен и соединение будет установлено в новом сокете.
 795                    expected_client_info.connection_id = unexpected_client_info.connection_id
 796                    expected_client_info._Client__connection = \
 797                        unexpected_client_info._Client__connection
 798                    self._conns_of_expected_clients[connection_info.conn.result] = expected_client_info.id
 799                    connection_info.connected_expected_client_id = expected_client_info.id
 800                    connection_info.connected_expected_client = expected_client_info
 801                    self._io_iteration_result.newly_connected_expected_clients.add(expected_client_info.id)
 802                else:
 803                    # Произошел запрос на подключение к супер-серверу, но клиент с таким же ключевым словом уже
 804                    # подключен (или наоборот). Или же просто имеется установленное соединение с супер-сервером.
 805                    # Необходимо его отключить.
 806                    self._mark_connection_to_be_closed_immediately(connection_info)
 807                    self._mark_connection_as_ready_for_deletion(connection_info)
 808                self._remove_unexpected_client(unexpected_client_id)
 809            else:
 810                # клиент еще не подключен
 811                expected_client_info.connection_id = None
 812                expected_client_info._Client__connection = None
 813
 814        if ConnectionDirectionRole.server == expected_client_info.connection_settings.direction_role:
 815            self._connect_to_super_server(expected_client_info)
 816
 817        self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword] = expected_client_info.id
 818        self._expected_clients[expected_client_info.id] = expected_client_info
 819
 820        return expected_client_info.id
 821
 822    def get_client_id_by_keyword(self, expected_client_keyword):
 823        """
 824        :param expected_client_keyword: expected_client_keyword
 825        :return: link to Client()
 826        """
 827        return self._keywords_for_expected_clients[expected_client_keyword]
 828
 829    def get_client_info(self, expected_client_id):
 830        """
 831        :param expected_client_id: expected_client_id
 832        :return: link to Client()
 833        """
 834        return self._expected_clients[expected_client_id]
 835
 836    def get_connection_input_fifo_size_for_client(self, expected_client_id):
 837        expected_client_info = self._expected_clients[expected_client_id]
 838        connection_info = expected_client_info._Client__connection
 839        if connection_info is None:
 840            raise Exception('Expected client was not connected yet!')
 841        # if client_info.this_is_raw_connection:
 842        #     if client_info.input_from_client.size():
 843        #         return 1
 844        #     else:
 845        #         return 0
 846        # else:
 847        #     return client_info.input_from_client.size()
 848        return connection_info.input_from_client.size()
 849
 850    def get_message_from_client(self, expected_client_id):
 851        expected_client_info = self._expected_clients[expected_client_id]
 852        connection_info = expected_client_info._Client__connection
 853        if connection_info is None:
 854            raise Exception('Expected client was not connected yet!')
 855        if not connection_info.input_from_client.size():
 856            raise Exception('There is no readable data in expected client\'s FIFO!')
 857        # if client_info.this_is_raw_connection:
 858        #     self._consolidate_raw_messages_in_input_from_client_fifo(client_info)
 859        return connection_info.input_from_client.get()
 860
 861    # @profile
 862    def get_messages_from_client(self, expected_client_id):
 863        expected_client_info = self._expected_clients[expected_client_id]
 864        connection_info = expected_client_info._Client__connection
 865        if connection_info is None:
 866            raise Exception('Expected client was not connected yet!')
 867        try:
 868            while True:
 869                yield connection_info.input_from_client.get()
 870        except FIFOIsEmpty:
 871            pass
 872            # while client_info.input_from_client.size():
 873            #     yield client_info.input_from_client.get()
 874
 875    def get_connection_output_fifo_size_for_client(self, expected_client_id):
 876        expected_client_info = self._expected_clients[expected_client_id]
 877        connection_info = expected_client_info._Client__connection
 878        if connection_info is None:
 879            raise Exception('Expected client was not connected yet!')
 880        return connection_info.output_to_client.size()
 881
 882    def send_message_to_client(self, expected_client_id, data):
 883        # data = bytes(data)
 884        expected_client_info = self._expected_clients[expected_client_id]
 885        connection_info = expected_client_info._Client__connection
 886        if connection_info is None:
 887            raise Exception('Expected client was not connected yet!')
 888        if connection_info.this_is_raw_connection:
 889            self._send_message_through_connection_raw(connection_info, data)
 890        else:
 891            self._send_message_through_connection(connection_info, data)
 892
 893    def send_messages_to_client(self, expected_client_id, messages_list):
 894        # data = bytes(data)
 895        expected_client_info = self._expected_clients[expected_client_id]
 896        connection_info = expected_client_info._Client__connection
 897        if connection_info is None:
 898            raise Exception('Expected client was not connected yet!')
 899        if connection_info.this_is_raw_connection:
 900            self._send_messages_through_connection_raw(connection_info, messages_list)
 901        else:
 902            self._send_messages_through_connection(connection_info, messages_list)
 903
 904    def check_is_client_is_in_raw_connection_mode(self, expected_client_id):
 905        expected_client_info = self._expected_clients[expected_client_id]
 906        connection_info = expected_client_info._Client__connection
 907        if connection_info is None:
 908            raise Exception('Expected client was not connected yet!')
 909        return connection_info.this_is_raw_connection
 910
 911    def switch_client_raw_connection_mode(self, expected_client_id, is_raw: bool):
 912        expected_client_info = self._expected_clients[expected_client_id]
 913        connection_info = expected_client_info._Client__connection
 914        if connection_info is None:
 915            raise Exception('Expected client was not connected yet!')
 916        connection_info.this_is_raw_connection = is_raw
 917
 918    def set_inline_processor_for_client(self, expected_client_id,
 919                                        class_for_unknown_clients_inline_processing: type):
 920        expected_client_info = self._expected_clients[expected_client_id]
 921        connection_info = expected_client_info._Client__connection
 922        if connection_info is None:
 923            raise Exception('Expected client was not connected yet!')
 924        self._set_inline_processor_for_client(connection_info, expected_client_info,
 925                                              class_for_unknown_clients_inline_processing)
 926
 927    def close_client_connection(self, expected_client_id, raise_if_already_closed=True):
 928        """
 929        Connection will be closed immediately (inside this method)
 930        :param expected_client_id:
 931        :param raise_if_already_closed:
 932        :return:
 933        """
 934        if __debug__: self._log('CLOSE EXPECTED CLIENT SOCKET:')
 935        expected_client_info = self._expected_clients[expected_client_id]
 936        connection_info = expected_client_info._Client__connection
 937        if raise_if_already_closed and (connection_info is None):
 938            raise Exception('Expected client was not connected yet!')
 939        self.close_connection(connection_info.id)
 940
 941    def mark_client_connection_as_should_be_closed_immediately(self, expected_client_id,
 942                                                               raise_if_already_closed=True):
 943        """
 944        Connection will be closed immediately (inside main IO loop)
 945        :param expected_client_id:
 946        :param raise_if_already_closed:
 947        :return:
 948        """
 949        if __debug__: self._log('MARK EXPECTED CLIENT SOCKET AS SHOULD BE CLOSED IMMEDIATELY:')
 950        expected_client_info = self._expected_clients[expected_client_id]
 951        connection_info = expected_client_info._Client__connection
 952        if raise_if_already_closed and (connection_info is None):
 953            raise Exception('Expected client was not connected yet!')
 954        self._mark_connection_to_be_closed_immediately(connection_info)
 955
 956    def mark_client_connection_as_ready_to_be_closed(self, expected_client_id, raise_if_already_closed=True):
 957        """
 958        Connection will be closed when all output will be sent (inside main IO loop).
 959        :param expected_client_id:
 960        :param raise_if_already_closed:
 961        :return:
 962        """
 963        if __debug__: self._log('MARK EXPECTED CLIENT SOCKET AS READY TO BE CLOSED:')
 964        expected_client_info = self._expected_clients[expected_client_id]
 965        connection_info = expected_client_info._Client__connection
 966        if raise_if_already_closed and (connection_info is None):
 967            raise Exception('Expected client was not connected yet!')
 968        self._mark_connection_as_ready_to_be_closed(connection_info)
 969
 970    def remove_client(self, expected_client_id):
 971        if __debug__: self._log('REMOVE EXPECTED CLIENT: {}'.format(expected_client_id))
 972        expected_client_info = self._expected_clients[expected_client_id]
 973        if __debug__: self._log('\tWITH KEYWORD: {}'.format(expected_client_info.connection_settings.keyword))
 974        connection_id = expected_client_info.connection_id
 975        if connection_id is None:
 976            self._remove_client(expected_client_id)
 977        self.remove_connection(connection_id)
 978
 979    def add_connection(self, conn, address):
 980        """
 981        :param conn: socket
 982        :param address: address
 983        :return: client ID
 984        """
 985        if conn is None:
 986            raise TypeError('conn should not be None!')
 987
 988        self.socket_by_fd[conn.fileno()] = conn
 989        conn.setblocking(0)
 990        if self.use_nodelay_inet and (conn.family in INET_TYPE_CONNECTIONS):
 991            conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 992
 993        new_client_id = self._connections_id_gen()
 994
 995        client_info = Connection(new_client_id, (conn, address), self)
 996        self._connections[new_client_id] = client_info
 997        self._connection_by_conn[conn] = new_client_id
 998        self._input_check_sockets.add(conn)
 999        self._exception_check_sockets.add(conn)
1000        self._we_have_connections_for_select = True
1001
1002        self._unconfirmed_clients.add(conn)
1003
1004        return new_client_id
1005
1006    def check_connection_existance(self, connection_id):
1007        if connection_id not in self._expected_clients:
1008            return False
1009        if connection_id not in self._connections:
1010            return False
1011        client_info = self._connections[connection_id]
1012        if not client_info.conn.existence:
1013            return False
1014        conn = client_info.conn.result
1015        if conn is None:
1016            return False
1017        return True
1018
1019    def close_connection(self, connection_id):
1020        if __debug__: self._log('CLOSE CLIENT {}:'.format(connection_id))
1021        client_info = self._connections[connection_id]
1022        if not client_info.conn.existence:
1023            if __debug__: self._log('CLIENT {} CONN IS NOT SET.'.format(connection_id))
1024            return
1025        conn = client_info.conn.result
1026        if conn is None:
1027            if __debug__: self._log('CLIENT {} CONN IS NONE.'.format(connection_id))
1028            return
1029
1030        del self.socket_by_fd[conn.fileno()]
1031        conn.close()
1032        client_info.conn.existence = False
1033        client_info.output_to_client = copy.copy(client_info.output_to_client)  # clear all output data to free some
1034        #   memory even before destroying
1035
1036        if connection_id in self._connections_marked_as_ready_to_be_closed:
1037            self._connections_marked_as_ready_to_be_closed.remove(connection_id)
1038        if conn in self._connections_marked_to_be_closed_immediately:
1039            self._connections_marked_to_be_closed_immediately.remove(conn)
1040        if conn in self._connection_by_conn:
1041            del self._connection_by_conn[conn]
1042        if conn in self._input_check_sockets:
1043            self._input_check_sockets.remove(conn)
1044        if conn in self._output_check_sockets:
1045            self._output_check_sockets.remove(conn)
1046        if conn in self._exception_check_sockets:
1047            self._exception_check_sockets.remove(conn)
1048
1049        if not self._input_check_sockets:
1050            self._we_have_connections_for_select = False
1051
1052        if __debug__: self._log('CLIENT {} NORMALLY CLOSED.'.format(connection_id))
1053
1054    def close_connection_by_conn(self, conn):
1055        # Если conn не в списке - вылетет ошибка. Это предотвратит ошибочное закрытие незарегистрированного сокета.
1056        # И мы сможем обнаружить наличие соответствующей ошибки в коде.
1057        if __debug__: self._log('CLOSE CLIENT BY CONN: {}'.format(repr(conn)))
1058        connection_id = self._connection_by_conn[conn]
1059        if __debug__: self._log('\t WITH CLIENT ID: {}'.format(connection_id))
1060        self.close_connection(connection_id)
1061
1062    def remove_connection(self, connection_id):
1063        # client should NOT be removed immediately after connection close (close_connection):
1064        # code should do it by itself after reading all available input data
1065        if __debug__: self._log('REMOVE CLIENT: {}'.format(connection_id))
1066        client_info = self._connections[connection_id]
1067        if __debug__: self._log('\tWITH KEYWORD: {}'.format(client_info.keyword))
1068        if client_info.conn.existence:
1069            self.close_connection(connection_id)
1070        conn = client_info.conn.result
1071        if conn is None:
1072            return
1073
1074        if conn in self._conns_of_unexpected_clients:
1075            self._remove_unexpected_client(self._conns_of_unexpected_clients[conn])
1076
1077        # if conn in self._conns_of_expected_clients:
1078        #     self._remove_client(self._conns_of_expected_clients[conn])
1079        if client_info.connected_expected_client_id is not None:
1080            self._remove_client(client_info.connected_expected_client_id)
1081
1082        client_info.connected_expected_client_id = None
1083        client_info.connected_expected_client = None
1084
1085        if connection_id in self._connections_marked_as_ready_to_be_deleted:
1086            self._connections_marked_as_ready_to_be_deleted.remove(connection_id)
1087
1088        client_info.conn.existence = False
1089        client_info.conn.result = None
1090
1091        del self._connections[connection_id]
1092        if conn in self._connection_by_conn:
1093            del self._connection_by_conn[conn]
1094        if conn in self._input_check_sockets:
1095            self._input_check_sockets.remove(conn)
1096        if conn in self._output_check_sockets:
1097            self._output_check_sockets.remove(conn)
1098        if conn in self._exception_check_sockets:
1099            self._exception_check_sockets.remove(conn)
1100
1101            # client_info.remove()
1102
1103    def remove_connection_by_conn(self, conn):
1104        connection_id = self._connection_by_conn[conn]
1105        self.remove_connection(connection_id)
1106
1107    def _log(self, log_string):
1108        self._internal_log.append(log_string)
1109        if self.echo_log:
1110            print(log_string)
1111            sys.stdout.flush()
1112
1113    def _create_unknown_client_from_connection(self, client_info: Connection):
1114        keyword = None
1115        keyword_is_ok = False
1116        while not keyword_is_ok:
1117            keyword = self.prefix_for_unknown_client_keywords + self._unknown_clients_keyword_gen().encode()
1118            if keyword not in self._keywords_for_expected_clients:
1119                keyword_is_ok = True
1120        connection_settings = ConnectionSettings(direction_role=ConnectionDirectionRole.client,
1121                                                 socket_address=client_info.addr.result, keyword=keyword)
1122        expected_client_info = Client(connection_settings)
1123        expected_client_info.id = self._expected_clients_id_gen()
1124
1125        expected_client_info.connection_id = client_info.id
1126        expected_client_info._Client__connection = client_info
1127        expected_client_info.will_use_raw_client_connection = True
1128        expected_client_info.will_use_raw_connection_without_handshake = True
1129        expected_client_info.this_is_unknown_client = True
1130        if self.class_for_unknown_clients_inline_processing is not None:
1131            self._set_inline_processor_for_client(client_info, expected_client_info,
1132                                                  self.class_for_unknown_clients_inline_processing)
1133
1134        self._conns_of_expected_clients[client_info.conn.result] = expected_client_info.id
1135        client_info.connected_expected_client_id = expected_client_info.id
1136        client_info.connected_expected_client = expected_client_info
1137        self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword] = expected_client_info.id
1138        self._expected_clients[expected_client_info.id] = expected_client_info
1139
1140        self._io_iteration_result.newly_connected_unknown_clients.add(expected_client_info.id)
1141
1142        return expected_client_info.id
1143
1144    def _set_inline_processor_for_client(self, connection_info: Connection,
1145                                         expected_client_info: Client,
1146                                         class_for_unknown_clients_inline_processing: type):
1147        assert type(class_for_unknown_clients_inline_processing) == type, \
1148            '.class_for_unknown_clients_inline_processing must be a class or None'
1149        keyword = expected_client_info.connection_settings.keyword
1150        expected_client_info.obj_for_inline_processing = class_for_unknown_clients_inline_processing(
1151            expected_client_info.id, keyword, connection_info.conn.result.family, connection_info.conn.result.type,
1152            connection_info.conn.result.proto, copy.copy(connection_info.addr_info),
1153            copy.copy(connection_info.host_names), self._clients_with_inline_processors_that_need_to_apply_parameters
1154        )
1155        connection_info.has_inline_processor = True
1156        self._inline_processor__init_parameters(connection_info, expected_client_info)
1157
1158    def _connect_to_super_server(self, expected_client_info: Client):
1159        """
1160        Подключение происходит в блокируещем режиме (неблокирующий режим включается позже - в методе add_connection()).
1161        Для реализации неблокирующего режима надо оттестировать текущий код и ввести дополнительную неблокирующую
1162        логику через select/poll/epoll:
1163        http://man7.org/linux/man-pages/man2/connect.2.html
1164        :param expected_client_info:
1165        :return:
1166        """
1167        connection_settings = expected_client_info.connection_settings
1168        conn = None
1169        try:
1170            conn = socket.socket(connection_settings.socket_family, connection_settings.socket_type,
1171                                 connection_settings.socket_protocol, connection_settings.socket_fileno)
1172            self.socket_by_fd[conn.fileno()] = conn
1173            # conn.setblocking(0)
1174        except (socket.error, OSError) as err:
1175            if __debug__: self._log('EXCEPTION: SUPER SERVER: CONNECT TO: CREATE SOCKET: {}, {}'.format(
1176                err.errno, err.strerror))
1177            raise
1178
1179        try:
1180            conn.connect(connection_settings.socket_address)
1181        except (TimeoutError, socket.error, OSError) as err:
1182            # https://stackoverflow.com/questions/16772519/socket-recv-on-selected-socket-failing-with-etimedout
1183            del self.socket_by_fd[conn.fileno()]
1184            conn.close()
1185            if __debug__: self._log('EXCEPTION: SUPER SERVER: CONNECT TO: CONNECT:"{}", {}, {}'.format(
1186                connection_settings.socket_address, err.errno, err.strerror))
1187            raise
1188
1189        super_server_client_id = self.add_connection(conn, connection_settings.socket_address)
1190
1191        addr_info = host_names = None
1192        try:
1193            if self.should_get_client_addr_info_on_connection and (conn.family in INET_TYPE_CONNECTIONS):
1194                addr_info = socket.getaddrinfo(connection_settings.socket_address[0],
1195                                               connection_settings.socket_address[1])
1196                host_names = socket.gethostbyaddr(connection_settings.socket_address[0])
1197        except ConnectionError as err:
1198            # An established connection was aborted by the software in your host machine
1199            if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(connection_settings.socket_address))
1200            if __debug__: self._log('EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format(
1201                connection_settings.socket_address, err.errno, err.strerror))
1202            self._mark_connection_to_be_closed_immediately(super_server_client_id)
1203            ok = False
1204        except (socket.error, OSError) as err:
1205            if __debug__: self._log('EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format(
1206                connection_settings.socket_address, err.errno, err.strerror))
1207            if err.errno in SET_OF_CONNECTION_ERRORS:
1208                # An established connection was aborted by the software in your host machine
1209                if __debug__: self._log(
1210                    'CLOSING {}: Connection reset by peer'.format(connection_settings.socket_address))
1211                if __debug__: self._log(
1212                    'EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format(
1213                        connection_settings.socket_address, err.errno, err.strerror))
1214                self._mark_connection_to_be_closed_immediately(super_server_client_id)
1215                ok = False
1216            else:
1217                if 'nt' == os.name:
1218                    if errno.WSAECONNRESET == err.errno:
1219                        # An existing connection was forcibly closed by the remote host
1220                        if __debug__: self._log(
1221                            'CLOSING {}: Connection reset by peer'.format(connection_settings.socket_address))
1222                        if __debug__: self._log(
1223                            'EXCEPTION: CONNECT TO SUPER SERVER: GETTING CONNECTION INFO:"{}", {}, {}'.format(
1224                                connection_settings.socket_address, err.errno, err.strerror))
1225                        self._mark_connection_to_be_closed_immediately(super_server_client_id)
1226                        ok = False
1227                    else:
1228                        raise err
1229                else:
1230                    raise err
1231
1232        super_server_client_info = self._connections[super_server_client_id]
1233        super_server_client_info.addr_info = addr_info
1234        super_server_client_info.host_names = host_names
1235        self._log_new_connection(super_server_client_info, False)
1236
1237        if expected_client_info.will_use_raw_connection_without_handshake:
1238            # Connection is made without handshake
1239            super_server_client_info.this_is_raw_connection = True
1240            self._unconfirmed_clients.remove(super_server_client_info.conn.result)
1241            self._io_iteration_result.newly_connected_expected_clients.add(expected_client_info.id)
1242        else:
1243            keyword = expected_client_info.connection_settings.keyword
1244            if keyword is None:
1245                keyword = self._get_own_keyword_appropriate_for_connection(conn)
1246            if keyword is None:
1247                # Если ключевое слово не предоставлено - клиент будет помечен для закрытия и удаления
1248                self._mark_connection_to_be_closed_immediately(super_server_client_info)
1249                self._mark_connection_as_ready_for_deletion(super_server_client_info)
1250                raise Exception('Own keyword should be provided in connection_settings!')
1251            if keyword:
1252                # Если ключевое слово предоставлено (даже если оно путое типа b'' - оно будет отправлено супер-серверу).
1253                self._send_message_through_connection(super_server_client_info, keyword)
1254
1255        expected_client_info.connection_id = super_server_client_id
1256        expected_client_info._Client__connection = super_server_client_info
1257        self._conns_of_expected_clients[conn] = expected_client_info.id
1258        super_server_client_info.connected_expected_client_id = expected_client_info.id
1259        super_server_client_info.connected_expected_client = expected_client_info
1260
1261    def _get_own_keyword_appropriate_for_connection(self, conn):
1262        keyword = None
1263        random_keyword = None
1264        for gate_connection_settings in self.gates_connections_settings:
1265            random_keyword = gate_connection_settings.keyword
1266            if (conn.family == gate_connection_settings.socket_family) and (
1267                conn.type == gate_connection_settings.socket_type) and \
1268                    (conn.proto == gate_connection_settings.socket_protocol):
1269                keyword = gate_connection_settings.keyword
1270        if keyword is None:
1271            keyword = random_keyword
1272        return keyword
1273
1274    def _pack_message(self, data):
1275        return len(data).to_bytes(self.message_size_len, 'little') + data
1276
1277    def _remove_client(self, expected_client_id):
1278        expected_client_info = self._expected_clients[expected_client_id]
1279        del self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword]
1280        del self._expected_clients[expected_client_id]
1281        connection_info = expected_client_info._Client__connection
1282        if connection_info is not None:
1283            # you can remove expected client before it will be connected to the server
1284            del self._conns_of_expected_clients[connection_info.conn.result]
1285        expected_client_info.connection_id = None
1286        expected_client_info._Client__connection = None
1287
1288    def _add_unexpected_client(self, connection_id, keyword):
1289        connection_settings = ConnectionSettings(direction_role=ConnectionDirectionRole.client, keyword=keyword)
1290        unexpected_client_info = Client(connection_settings, self._unexpected_clients_id_gen(),
1291                                        connection_id)
1292
1293        self._unexpected_clients[unexpected_client_info.id] = unexpected_client_info
1294        self._keywords_of_unexpected_clients[keyword] = unexpected_client_info.id
1295        connection_info = unexpected_client_info._Client__connection
1296        self._conns_of_unexpected_clients[connection_info.conn.result] = unexpected_client_info.id
1297
1298        return unexpected_client_info.id
1299
1300    def _remove_unexpected_client(self, unexpected_client_id):
1301        unexpected_client_info = self._unexpected_clients[unexpected_client_id]
1302        connection_info = unexpected_client_info._Client__connection  # unexpected client appears only after
1303        #   connection
1304        del self._keywords_of_unexpected_clients[unexpected_client_info.connection_settings.keyword]
1305        del self._unexpected_clients[unexpected_client_id]
1306        del self._conns_of_unexpected_clients[connection_info.conn.result]
1307        unexpected_client_info.connection_id = None
1308        unexpected_client_info._Client__connection = None
1309
1310    # @profile
1311    def _accept_new_connection(self, readable_socket: socket.socket):
1312        # One of a "readable" self._gate sockets is ready to accept a connection
1313        ok = True
1314        while ok:
1315            connection_id = None
1316            client_info = None
1317
1318            connection = None
1319            client_address = None
1320            try:
1321                connection, client_address = readable_socket.accept()
1322                self.socket_by_fd[connection.fileno()] = connection
1323                connection_id = self.add_connection(connection, client_address)
1324                client_info = self._connections[connection_id]
1325            except BlockingIOError as err:
1326                ok = False
1327            except InterruptedError as err:
1328                pass
1329            except (socket.error, OSError) as err:
1330                if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno):
1331                    ok = False
1332                elif errno.EINTR == err.errno:
1333                    pass
1334                elif errno.ECONNABORTED == err.errno:
1335                    ok = False
1336                elif errno.EMFILE == err.errno:
1337                    if __debug__: self._log(
1338                        'The per-process limit on the number of open file descriptors had been reached.')
1339                    ok = False
1340                elif errno.ENFILE == err.errno:
1341                    if __debug__: self._log(
1342                        'The system-wide limit on the total number of open files had been reached.')
1343                    ok = False
1344                elif (errno.ENOBUFS == err.errno) or (errno.ENOMEM == err.errno):
1345                    if __debug__: self._log(
1346                        'Not enough free memory. Allocation is limited by the socket buffer limits.')
1347                    ok = False
1348                elif errno.EPROTO == err.errno:
1349                    if __debug__: self._log('Protocol error.')
1350                    ok = False
1351                elif errno.EPERM == err.errno:
1352                    if __debug__: self._log('Firewall rules forbid connection.')
1353                    ok = False
1354                else:
1355                    raise err
1356
1357            if (connection is not None) and (client_info is not None):
1358                addr_info = host_names = None
1359                try:
1360                    if self.should_get_client_addr_info_on_connection and \
1361                            (connection.family in INET_TYPE_CONNECTIONS):
1362                        addr_info = socket.getaddrinfo(client_address[0], client_address[1])
1363                        host_names = socket.gethostbyaddr(client_address[0])
1364                except ConnectionError as err:
1365                    # An established connection was aborted by the software in your host machine
1366                    if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(client_address))
1367                    if __debug__: self._log('EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format(
1368                        client_address, err.errno, err.strerror))
1369                    self._mark_connection_to_be_closed_immediately(client_info)
1370                    ok = False
1371                except (socket.error, OSError) as err:
1372                    if __debug__: self._log('EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format(
1373                        client_address, err.errno, err.strerror))
1374                    if err.errno in SET_OF_CONNECTION_ERRORS:
1375                        # An established connection was aborted by the software in your host machine
1376                        if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(client_address))
1377                        if __debug__: self._log('EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format(
1378                            client_address, err.errno, err.strerror))
1379                        self._mark_connection_to_be_closed_immediately(client_info)
1380                        ok = False
1381                    else:
1382                        if 'nt' == os.name:
1383                            if errno.WSAECONNRESET == err.errno:
1384                                # An existing connection was forcibly closed by the remote host
1385                                if __debug__: self._log(
1386                                    'CLOSING {}: Connection reset by peer'.format(client_address))
1387                                if __debug__: self._log(
1388                                    'EXCEPTION: ACCEPT CLIENT: GETTING CONNECTION INFO:"{}", {}, {}'.format(
1389                                        client_address, err.errno, err.strerror))
1390                                self._mark_connection_to_be_closed_immediately(client_info)
1391                                ok = False
1392                            else:
1393                                raise err
1394                        else:
1395                            raise err
1396
1397                client_info.addr_info = addr_info
1398                client_info.host_names = host_names
1399                self._log_new_connection(client_info, True)
1400
1401                if self.need_to_auto_check_incoming_raw_connection \
1402                        and self.raw_checker_for_new_incoming_connections(self, client_info):
1403                    # Is should be RAW:
1404                    if self.unknown_clients_are_allowed:
1405                        # Unknown clients are allowed - create Unknown Expected Client for this connection
1406                        client_info.this_is_raw_connection = True
1407                        self._unconfirmed_clients.remove(client_info.conn.result)
1408                        self._create_unknown_client_from_connection(client_info)
1409                    else:
1410                        # Unknown clients are Non allowed - close connection.
1411                        if __debug__: self._log(
1412                            'UNKNOWN CLIENT {} WILL BE CLOSED: UNKNOWN CLIENTS ARE NOT ALLOWED'.format(
1413                                client_info.addr.result
1414                            ))
1415                        self._mark_connection_to_be_closed_immediately(client_info)
1416
1417    # @profile
1418    def _read_data_from_already_connected_socket__inner__memory_optimized(self, curr_client_info: Connection,
1419                                                                          possible_messages_in_client_input_fifo=False):
1420        ok = True
1421
1422        data = curr_client_info.conn.result.recv(curr_client_info.recv_buff_size)
1423        data_len = len(data)
1424        curr_client_info.calc_new_recv_buff_size(data_len)
1425        if data:
1426            data = memoryview(data)
1427            if curr_client_info.this_is_raw_connection:
1428                curr_client_info.input_from_client.put(data)
1429                possible_messages_in_client_input_fifo = True
1430            else:
1431                curr_client_info.raw_input_from_client.add_piece_of_data(data)
1432                possible_messages_in_client_input_fifo = self._read_messages_from_raw_input_into_fifo(
1433                    curr_client_info)
1434        else:
1435            # Interpret empty result as closed connection
1436            if __debug__: self._log(
1437                'CLOSING {} after reading no data:'.format(curr_client_info.addr.result))
1438            self._mark_connection_to_be_closed_immediately(curr_client_info)
1439            ok = False
1440
1441        result = (ok, possible_messages_in_client_input_fifo)
1442        return result
1443
1444    # @profile
1445    def _read_data_from_already_connected_socket__inner__speed_optimized(self, curr_client_info: Connection,
1446                                                                         possible_messages_in_client_input_fifo=False):
1447        ok = True
1448
1449        if curr_client_info.current_memoryview_input:
1450            nbytes = 0
1451            try:
1452                nbytes = curr_client_info.conn.result.recv_into(curr_client_info.current_memoryview_input)
1453            except TimeoutError:
1454                # https://stackoverflow.com/questions/16772519/socket-recv-on-selected-socket-failing-with-etimedout
1455                pass
1456
1457            if nbytes > 0:
1458                data = curr_client_info.current_memoryview_input[:nbytes]
1459                curr_client_info.current_memoryview_input = curr_client_info.current_memoryview_input[nbytes:]
1460
1461                if curr_client_info.this_is_raw_connection:
1462                    curr_client_info.input_from_client.put(data)
1463                    possible_messages_in_client_input_fifo = True
1464                else:
1465                    curr_client_info.raw_input_from_client.add_piece_of_data(data)
1466                    possible_messages_in_client_input_fifo = self._read_messages_from_raw_input_into_fifo(
1467                        curr_client_info)
1468            else:
1469                # Interpret empty result as closed connection
1470                if __debug__: self._log(
1471                    'CLOSING {} after reading no data:'.format(curr_client_info.addr.result))
1472                self._mark_connection_to_be_closed_immediately(curr_client_info)
1473                ok = False
1474        else:
1475            input_buffer = bytearray(self.socket_read_fixed_buffer_size.result)
1476            curr_client_info.current_memoryview_input = memoryview(input_buffer)
1477
1478        result = (ok, possible_messages_in_client_input_fifo)
1479        return result
1480
1481    # @profile
1482    def _read_data_from_already_connected_socket__shell(self, readable_socket: socket.socket):
1483        possible_messages_in_client_input_fifo = False
1484
1485        curr_client_id = self._connection_by_conn[readable_socket]
1486        curr_client_info = self._connections[curr_client_id]
1487
1488        ok = True
1489        while ok:
1490            try:
1491                if self.use_speed_optimized_socket_read:
1492                    ok, possible_messages_in_client_input_fifo = \
1493                        self._read_data_from_already_connected_socket__inner__speed_optimized(
1494                            curr_client_info, possible_messages_in_client_input_fifo)
1495                else:
1496                    ok, possible_messages_in_client_input_fifo = \
1497                        self._read_data_from_already_connected_socket__inner__memory_optimized(
1498                            curr_client_info, possible_messages_in_client_input_fifo)
1499                break  # makes IO faster on 10-30% on all modes (message/raw, data/http,
1500                #   static_read_buffer/non_static_read_buffer).
1501                # Ускорение происходит за счет того, что: 1) данных оказывается не настолько много чтобы кеш процессора
1502                # переполнялся и требовалась бы работа с оперативной памятью; 2) при таком разбиении ввод и вывод
1503                # начинают происходить (на уровне ОС) одновременно. Т.е. мы взяли из входного буфера порцию данных и
1504                # пустили в обработку. В это время ввод на уровне ОС продолжается: ОС заполняет опустевшие буферы.
1505                # После обработки мы пускаем данные на отправку и забираем очередную порцию данных из входного буфера.
1506                # В это время происходит отправка данных из буферов на уровне ОС. И т.д.
1507            except BlockingIOError as err:
1508                ok = False
1509            except InterruptedError as err:
1510                pass
1511            except ConnectionError as err:
1512                # An established connection was aborted by the software in your host machine
1513                if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result))
1514                if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format(
1515                    curr_client_info.addr.result, err.errno, err.strerror))
1516                self._mark_connection_to_be_closed_immediately(curr_client_info)
1517                ok = False
1518            except (socket.error, OSError) as err:
1519                if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno):
1520                    ok = False
1521                elif errno.EINTR == err.errno:
1522                    pass
1523                elif err.errno in SET_OF_CONNECTION_ERRORS:
1524                    # An established connection was aborted by the software in your host machine
1525                    if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result))
1526                    if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format(
1527                        curr_client_info.addr.result, err.errno, err.strerror))
1528                    self._mark_connection_to_be_closed_immediately(curr_client_info)
1529                    ok = False
1530                else:
1531                    if 'nt' == os.name:
1532                        if errno.WSAECONNRESET == err.errno:
1533                            # An existing connection was forcibly closed by the remote host
1534                            if __debug__: self._log(
1535                                'CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result))
1536                            if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format(
1537                                curr_client_info.addr.result, err.errno, err.strerror))
1538                            self._mark_connection_to_be_closed_immediately(curr_client_info)
1539                            ok = False
1540                        elif errno.EHOSTUNREACH == err.errno:
1541                            # OSError: [Errno 113] No route to host
1542                            if __debug__: self._log(
1543                                'CLOSING {}: No route to host'.format(curr_client_info.addr.result))
1544                            if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format(
1545                                curr_client_info.addr.result, err.errno, err.strerror))
1546                            self._mark_connection_to_be_closed_immediately(curr_client_info)
1547                            ok = False
1548                        else:
1549                            if __debug__: self._log(
1550                                'CLOSING {}: Unknown reason'.format(curr_client_info.addr.result))
1551                            if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format(
1552                                curr_client_info.addr.result, err.errno, err.strerror))
1553                            raise err
1554                    else:
1555                        if errno.WSAEHOSTUNREACH == err.errno:
1556                            # OSError: [Errno 113] No route to host
1557                            if __debug__: self._log(
1558                                'CLOSING {}: No route to host'.format(curr_client_info.addr.result))
1559                            if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format(
1560                                curr_client_info.addr.result, err.errno, err.strerror))
1561                            self._mark_connection_to_be_closed_immediately(curr_client_info)
1562                            ok = False
1563                        else:
1564                            if __debug__: self._log(
1565                                'CLOSING {}: Unknown reason'.format(curr_client_info.addr.result))
1566                            if __debug__: self._log('EXCEPTION: READ DATA FROM SOCKET: "{}", {}, {}'.format(
1567                                curr_client_info.addr.result, err.errno, err.strerror))
1568                            self._mark_connection_to_be_closed_immediately(curr_client_info)
1569                            ok = False
1570                            # raise err
1571
1572            read_is_forbidden = False
1573            if (self.global_in__data_full_size.result - self.global_in__deletable_data_full_size.result) \
1574                    > self.global_in__data_size_limit.result:
1575                read_is_forbidden = True
1576                ok = False
1577
1578            read_is_forbidden_test = self.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger(
1579                read_is_forbidden)
1580            if read_is_forbidden_test is not None:
1581                if read_is_forbidden_test:
1582                    print('Read is suppressed until data will be processed.')
1583                else:
1584                    print('Read is allowed: data is processed.')
1585
1586        return possible_messages_in_client_input_fifo
1587
1588    # @profile
1589    def _read_data_from_socket(self, readable_socket: socket.socket):
1590        possible_messages_in_client_input_fifo = False
1591        if readable_socket in self._gate:
1592            accept_is_forbidden = True
1593            if (self.global_in__data_full_size.result + self.global_out__data_full_size.result) \
1594                    <= self.global__data_size_limit.result:
1595                accept_is_forbidden = False
1596                self._accept_new_connection(readable_socket)
1597
1598            if __debug__:
1599                accept_is_forbidden_test = \
1600                    self.show_inform_about_accept_stop_because_of_all_buffers_size_limit.test_trigger(
1601                        accept_is_forbidden)
1602                if accept_is_forbidden_test is not None:
1603                    if accept_is_forbidden_test:
1604                        print('Accept is suppressed until data will be processed and/or out.')
1605                    else:
1606                        print('Accept is allowed: data is processed and/or out.')
1607        else:
1608            possible_messages_in_client_input_fifo = self._read_data_from_already_connected_socket__shell(
1609                readable_socket)
1610
1611        return possible_messages_in_client_input_fifo
1612
1613    def _log_new_connection(self, client_info: Connection, is_incoming_connection):
1614        connection = client_info.conn.result
1615        client_address = client_info.addr.result
1616        addr_info = client_info.addr_info
1617        host_names = client_info.host_names
1618
1619        connection_type_string = 'OUTGOING'
1620        from_to = 'to'
1621        if is_incoming_connection:
1622            connection_type_string = 'INCOMING'
1623            from_to = 'from'
1624
1625        if __debug__: self._log('New {} connection {} {}'.format(connection_type_string, from_to, client_address))
1626        if connection.family in {socket.AF_INET, socket.AF_INET6}:
1627            if __debug__: self._log('\taddr_info: {}'.format(addr_info))
1628            if __debug__: self._log('\thost_names: {}'.format(host_names))
1629
1630    # @profile
1631    def _write_data_to_socket(self, curr_client_info: Connection):
1632        # CAUTION: code here is optimized for speed - not for readability or beauty.
1633
1634        expected_client_info = curr_client_info.connected_expected_client
1635        writable_socket = curr_client_info.conn.result
1636        if curr_client_info.should_be_closed:
1637            curr_client_info.current_memoryview_output = None
1638            curr_client_info.output_to_client = copy.copy(curr_client_info.output_to_client)
1639            return
1640
1641        ok = True
1642        first_pass = True
1643        can_call__inline_processor__on__output_buffers_are_empty = True
1644        while ok:
1645            try:
1646                if curr_client_info.current_memoryview_output:
1647                    nsent = writable_socket.send(curr_client_info.current_memoryview_output)
1648                    curr_client_info.current_memoryview_output = curr_client_info.current_memoryview_output[nsent:]
1649                else:
1650                    curr_client_info.current_memoryview_output = None
1651                    output_fifo_size = curr_client_info.output_to_client.size()
1652                    if output_fifo_size > 1:
1653                        result_data, result_size, result_qnt = \
1654                            curr_client_info.output_to_client.get_at_least_size(524288)
1655                        if result_qnt > 1:
1656                            curr_client_info.current_memoryview_output = memoryview(b''.join(result_data))
1657                        else:
1658                            curr_client_info.current_memoryview_output = memoryview(result_data.popleft())
1659                    elif output_fifo_size == 1:
1660                        curr_client_info.current_memoryview_output = memoryview(curr_client_info.output_to_client.get())
1661
1662                    if curr_client_info.current_memoryview_output is None:
1663                        # if curr_client_info.ready_to_be_closed:
1664                        #     if first_pass:
1665                        #         # Т.е. если данных на отправку небыло даже при первом проходе цикла - т.е. изначально.
1666                        #         # Это значит что все данные были отправлены, и можно закрывать соединение.
1667                        #         self._output_check_sockets.remove(writable_socket)
1668                        #         self._mark_connection_to_be_closed_immediately(curr_client_info)
1669                        #     # Если соединение помечено как "Готово к закрытию" - то нам надо дождаться момента когда
1670                        #     # данные будут отправлены, и только в этот момент закрывать соединение. Поэтому надо
1671                        #     # сохранить сокет в списке проверяемых для отправки.
1672                        # else:
1673                        #     self._output_check_sockets.remove(writable_socket)
1674                        if not curr_client_info.ready_to_be_closed:
1675                            # Если соединение помечено как "Готово к закрытию" - то нам надо дождаться момента когда
1676                            # данные будут отправлены, и только в этот момент закрывать соединение. Поэтому надо
1677                            # сохранить сокет в списке проверяемых для отправки.
1678                            self._output_check_sockets.remove(writable_socket)
1679                        if first_pass and curr_client_info.ready_to_be_closed:
1680                            # Т.е. если данных на отправку небыло даже при первом проходе цикла - т.е. изначально.
1681                            # Это значит что все данные были отправлены, и можно закрывать соединение.
1682                            self._output_check_sockets.remove(writable_socket)
1683                            self._mark_connection_to_be_closed_immediately(curr_client_info)
1684                        ok = False
1685                        if expected_client_info:
1686                            self._io_iteration_result.clients_with_empty_output_fifo.add(
1687                                curr_client_info.connected_expected_client_id)
1688                            if curr_client_info.has_inline_processor:
1689                                if can_call__inline_processor__on__output_buffers_are_empty:
1690                                    if self._inline_processor__on__output_buffers_are_empty(curr_client_info,
1691                                                                                            expected_client_info):
1692                                        ok = True
1693                                    can_call__inline_processor__on__output_buffers_are_empty = False
1694            except BlockingIOError as err:
1695                ok = False
1696            except InterruptedError as err:
1697                pass
1698            except ConnectionError as err:
1699                # An established connection was aborted by the software in your host machine
1700                if __debug__: self._log('CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result))
1701                if __debug__: self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format(
1702                    curr_client_info.addr.result, err.errno, err.strerror))
1703                self._mark_connection_to_be_closed_immediately(curr_client_info)
1704                ok = False
1705            except (socket.error, OSError) as err:
1706                if (errno.EAGAIN == err.errno) or (errno.EWOULDBLOCK == err.errno):
1707                    ok = False
1708                elif errno.EINTR == err.errno:
1709                    pass
1710                elif err.errno in SET_OF_CONNECTION_ERRORS:
1711                    # Connection reset by peer
1712                    if __debug__: self._log(
1713                        'CLOSING {}: Connection reset by peer ({})'.format(curr_client_info.addr.result,
1714                                                                           err.strerror))
1715                    if __debug__: self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format(
1716                        curr_client_info.addr.result, err.errno, err.strerror))
1717                    self._mark_connection_to_be_closed_immediately(curr_client_info)
1718                    ok = False
1719                else:
1720                    if 'nt' == os.name:
1721                        if errno.WSAECONNRESET == err.errno:
1722                            # An existing connection was forcibly closed by the remote host
1723                            if __debug__: self._log(
1724                                'CLOSING {}: Connection reset by peer'.format(curr_client_info.addr.result))
1725                            if __debug__: self._log('EXCEPTION: WRITE DATA TO SOCKET: "{}", {}, {}'.format(
1726                                curr_client_info.addr.result, err.errno, err.strerror))
1727                            self._mark_connection_to_be_closed_immediately(curr_client_info)
1728                            ok = False
1729                        else:
1730                            raise err
1731                    else:
1732                        raise err
1733            first_pass = False
1734
1735    def _mark_connection_to_be_closed_immediately(self, client_info: Connection):
1736        client_info.should_be_closed = True
1737        client_info.current_memoryview_input = None
1738        self._connections_marked_to_be_closed_immediately.add(client_info.conn.result)
1739        if client_info.connected_expected_client_id is not None:
1740            self._io_iteration_result.clients_with_disconnected_connection.add(
1741                client_info.connected_expected_client_id)
1742
1743    def _mark_connection_as_ready_to_be_closed(self, client_info: Connection):
1744        client_info.ready_to_be_closed = True
1745
1746    def _mark_connection_as_ready_for_deletion(self, client_info: Connection):
1747        client_info.ready_for_deletion = True
1748        self._connections_marked_as_ready_to_be_deleted.add(client_info.conn.result)
1749
1750    def _handle_connection_error(self, writable_socket: socket.socket):
1751        # Data read from already connected client
1752        curr_client_id = self._connection_by_conn[writable_socket]
1753        curr_client_info = self._connections[curr_client_id]
1754        if curr_client_info.should_be_closed:
1755            return
1756        if __debug__: self._log('handling exceptional condition for {}'.format(curr_client_info.addr.result))
1757        self._mark_connection_to_be_closed_immediately(curr_client_info)
1758
1759    # @profile
1760    def _read_messages_from_raw_input_into_fifo(self, curr_client_info: Connection):
1761        result = False
1762
1763        try:
1764            while True:
1765                if curr_client_info.current_message_length is None:
1766                    current_message_length = curr_client_info.raw_input_from_client.get_data(self.message_size_len)
1767                    if current_message_length is None:
1768                        break
1769
1770                    curr_client_info.current_message_length = int.from_bytes(current_message_length, 'little')
1771
1772                current_message = curr_client_info.raw_input_from_client.get_data(
1773                    curr_client_info.current_message_length)
1774                if current_message is None:
1775                    break
1776                else:
1777                    curr_client_info.input_from_client.put(current_message)
1778                    curr_client_info.current_message_length = None
1779                    result = True
1780        except TypeError:
1781            pass
1782
1783        return result
1784
1785    def _send_message_through_connection(self, client_info: Connection, data):
1786        if client_info.conn.existence:
1787            client_info.output_to_client.put(len(data).to_bytes(self.message_size_len, 'little'))
1788            client_info.output_to_client.put(data)
1789
1790            self._output_check_sockets.add(client_info.conn.result)
1791        else:
1792            if __debug__: self._log('ERROR: SEND MESSAGE TO CLIENT {}: "{}"'.format(client_info.addr.result, data))
1793            raise Exception('EXCEPTION: SEND MESSAGE TO CLIENT: Client is disconnected! You can not send data to him!')
1794
1795    def _generate_list_of_messages_with_their_length(self, messages_list):
1796        for message in messages_list:
1797            yield len(message).to_bytes(self.message_size_len, 'little')
1798            yield message
1799
1800    def _send_messages_through_connection(self, client_info: Connection, messages_list):
1801        if client_info.conn.existence:
1802            client_info.output_to_client.extend(self._generate_list_of_messages_with_their_length(messages_list))
1803
1804            self._output_check_sockets.add(client_info.conn.result)
1805        else:
1806            if __debug__: self._log(
1807                'ERROR: SEND MESSAGES TO CLIENT {}: "{}"'.format(client_info.addr.result, messages_list))
1808            raise Exception('EXCEPTION: SEND MESSAGES TO CLIENT: Client is disconnected! You can not send data to him!')
1809
1810    def _send_message_through_connection_raw(self, client_info: Connection, data):
1811        if client_info.conn.existence:
1812            client_info.output_to_client.put(data)
1813            self._output_check_sockets.add(client_info.conn.result)
1814        else:
1815            if __debug__: self._log('ERROR: SEND MESSAGE TO CLIENT {}: "{}"'.format(client_info.addr.result, data))
1816            raise Exception(
1817                'EXCEPTION: SEND MESSAGE TO CLIENT RAW: Client is disconnected! You can not send data to him!')
1818
1819    def _send_messages_through_connection_raw(self, client_info: Connection, messages_list):
1820        if client_info.conn.existence:
1821            client_info.output_to_client.extend(messages_list)
1822            self._output_check_sockets.add(client_info.conn.result)
1823        else:
1824            if __debug__: self._log(
1825                'ERROR: SEND MESSAGES TO CLIENT {}: "{}"'.format(client_info.addr.result, messages_list))
1826            raise Exception(
1827                'EXCEPTION: SEND MESSAGES TO CLIENT RAW: Client is disconnected! You can not send data to him!')
1828
1829    def _move_message_from_fifo_to_memoryview(self, client_info: Connection):
1830        if client_info.current_memoryview_output is None:
1831            if client_info.output_to_client.size():
1832                client_info.current_memoryview_output = memoryview(client_info.output_to_client.get())
1833
1834    # @profile
1835    def _consolidate_and_move_messages_from_fifo_to_memoryview(self, client_info: Connection):
1836        output_fifo_size = client_info.output_to_client.size()
1837        if output_fifo_size > 1:
1838            result_data, result_size, result_qnt = \
1839                client_info.output_to_client.get_at_least_size(524288)
1840            if result_qnt > 1:
1841                client_info.current_memoryview_output = memoryview(b''.join(result_data))
1842            else:
1843                client_info.current_memoryview_output = memoryview(result_data.popleft())
1844        elif output_fifo_size == 1:
1845            client_info.current_memoryview_output = memoryview(client_info.output_to_client.get())
1846
1847    def _process_client_keyword(self, client_socket: socket.socket):
1848        curr_client_id = self._connection_by_conn[client_socket]
1849        curr_client_info = self._connections[curr_client_id]
1850
1851        if curr_client_info.input_from_client.size() >= 0:
1852            expected_client_id = None
1853            expected_client_info = None
1854
1855            this_is_super_server_client = False
1856            if curr_client_info.connected_expected_client is not None:
1857                expected_client_info = curr_client_info.connected_expected_client
1858                expected_client_id = expected_client_info.id
1859                if ConnectionDirectionRole.server == expected_client_info.connection_settings.direction_role:
1860                    this_is_super_server_client = True
1861
1862            if this_is_super_server_client:
1863                # This is connection to Super-Server. So we expect an answer like b'OK'
1864                super_server_answer__keyword_accepted = curr_client_info.input_from_client.get()
1865                super_server_answer__keyword_accepted = bytes(super_server_answer__keyword_accepted)
1866                if super_server_answer__keyword_accepted == self.server_answer__keyword_accepted:
1867                    # Answer was acceptable
1868                    self._unconfirmed_clients.remove(client_socket)
1869                    self._io_iteration_result.newly_connected_expected_clients.add(expected_client_id)
1870                    if expected_client_info.will_use_raw_client_connection:
1871                        curr_client_info.this_is_raw_connection = True
1872                else:
1873                    # Answer was NOT acceptable
1874                    self._mark_connection_to_be_closed_immediately(curr_client_info)
1875                    self._mark_connection_as_ready_for_deletion(curr_client_info)
1876                    if __debug__: self._log('ERROR: SUPER SERVER ANSWER - KEYWORD WAS NOT ACCEPTED: {}'.format(
1877                        super_server_answer__keyword_accepted))
1878            else:
1879                # This is connection to client. So we expect a keyword
1880                keyword = curr_client_info.input_from_client.get()
1881                keyword = bytes(keyword)
1882                curr_client_info.keyword = keyword
1883                self._unconfirmed_clients.remove(client_socket)
1884                self._send_message_through_connection(curr_client_info, self.server_answer__keyword_accepted)
1885
1886                if keyword in self._keywords_for_expected_clients:
1887                    # empty expected client was already registered
1888                    expected_client_id = self._keywords_for_expected_clients[keyword]
1889                    expected_client_info = self._expected_clients[expected_client_id]
1890                    expected_client_info.connection_id = curr_client_id
1891                    expected_client_info._Client__connection = curr_client_info
1892                    self._conns_of_expected_clients[client_socket] = expected_client_id
1893                    curr_client_info.connected_expected_client_id = expected_client_id
1894                    curr_client_info.connected_expected_client = expected_client_info
1895                    self._io_iteration_result.newly_connected_expected_clients.add(expected_client_id)
1896                    if expected_client_info.will_use_raw_client_connection:
1897                        curr_client_info.this_is_raw_connection = True
1898                else:
1899                    # it is unknown expected client
1900                    if self.unexpected_clients_are_allowed:
1901                        self._add_unexpected_client(curr_client_id, keyword)
1902                    else:
1903                        self._mark_connection_to_be_closed_immediately(curr_client_info)
1904                        self._mark_connection_as_ready_for_deletion(curr_client_info)
1905
1906    def _check_is_client_have_data_to_read_in_fifo(self, readable_socket: socket.socket):
1907        client_info = self._connections[self._connection_by_conn[readable_socket]]
1908        if client_info.connected_expected_client_id is not None:
1909            if client_info.input_from_client.size():
1910                self._io_iteration_result.clients_have_data_to_read.add(
1911                    client_info.connected_expected_client_id)
1912                if client_info.has_inline_processor:
1913                    self._inline_processor__on__data_received(client_info)
1914
1915    def _client_have_data_to_read_in_fifo(self, readable_socket: socket.socket):
1916        if readable_socket in self._conns_of_expected_clients:
1917            expected_client_id = self._conns_of_expected_clients[readable_socket]
1918            expected_client = self._expected_clients[expected_client_id]
1919            self._io_iteration_result.clients_have_data_to_read.add(expected_client_id)
1920            client_info = expected_client._Client__connection
1921            if client_info.has_inline_processor:
1922                self._inline_processor__on__data_received(client_info)
1923
1924    def _inline_processor__apply_parameters(self, connection_info: Connection,
1925                                            expected_client: Client):
1926        inline_processor = expected_client.obj_for_inline_processing
1927
1928        inline_processor.is_in_raw_mode = inline_processor._InlineProcessor__set__is_in_raw_mode
1929        connection_info.this_is_raw_connection = inline_processor._InlineProcessor__set__is_in_raw_mode
1930
1931        if inline_processor._InlineProcessor__set__mark_socket_as_should_be_closed_immediately:
1932            inline_processor._InlineProcessor__set__mark_socket_as_should_be_closed_immediately = False
1933            self._mark_connection_to_be_closed_immediately(connection_info)
1934
1935        if inline_processor._InlineProcessor__set__mark_socket_as_ready_to_be_closed:
1936            inline_processor._InlineProcessor__set__mark_socket_as_ready_to_be_closed = False
1937            self._mark_connection_as_ready_to_be_closed(connection_info)
1938
1939    def _inline_processor__init_parameters(self, connection_info: Connection,
1940                                           expected_client: Client):
1941        inline_processor = expected_client.obj_for_inline_processing
1942
1943        inline_processor.is_in_raw_mode = connection_info.this_is_raw_connection
1944        inline_processor._InlineProcessor__set__is_in_raw_mode = connection_info.this_is_raw_connection
1945
1946    def _inline_processor__on__data_received(self, connection_info: Connection):
1947        expected_client = connection_info.connected_expected_client
1948        inline_processor = expected_client.obj_for_inline_processing
1949
1950        try:
1951            while connection_info.input_from_client.size():
1952                inline_processor.on__data_received(connection_info.input_from_client.get())
1953
1954            if inline_processor.output_messages:
1955                while inline_processor.output_messages:
1956                    another_message = inline_processor.output_messages.popleft()
1957                    if not connection_info.this_is_raw_connection:
1958                        connection_info.output_to_client.put(
1959                            len(another_message).to_bytes(self.message_size_len, 'little'))
1960                    connection_info.output_to_client.put(another_message)
1961                self._output_check_sockets.add(connection_info.conn.result)
1962                if connection_info.output_to_client.get_data_full_size() >= 65536:
1963                    self._write_data_to_socket(connection_info)
1964                return True
1965        except:
1966            self.remove_client(expected_client.id)
1967            exc = sys.exc_info()
1968            exception = exc
1969            error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0]))
1970            formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2])
1971            exception = exception[:2] + (formatted_traceback,)
1972            trace_str = ''.join(exception[2])
1973            result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str)
1974            if __debug__: self._log('EXCEPTION: INLINE PROCESSOR: ON DATA RECEIVED: {}'.format(result_string))
1975
1976        return False
1977
1978    def _inline_processor__on__output_buffers_are_empty(self, connection_info: Connection,
1979                                                        expected_client: Client):
1980        inline_processor = expected_client.obj_for_inline_processing
1981
1982        if not connection_info.has_inline_processor:
1983            return False
1984
1985        try:
1986            inline_processor.on__output_buffers_are_empty()
1987
1988            if inline_processor.output_messages:
1989                while inline_processor.output_messages:
1990                    another_message = inline_processor.output_messages.popleft()
1991                    if not connection_info.this_is_raw_connection:
1992                        connection_info.output_to_client.put(
1993                            len(another_message).to_bytes(self.message_size_len, 'little'))
1994                    connection_info.output_to_client.put(another_message)
1995                self._output_check_sockets.add(connection_info.conn.result)
1996                if connection_info.output_to_client.get_data_full_size() >= 65536:
1997                    # self._write_data_to_socket(connection_info.conn.result)
1998                    self._write_data_to_socket(connection_info)
1999                return True
2000        except:
2001            self.remove_client(expected_client.id)
2002            exc = sys.exc_info()
2003            exception = exc
2004            error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0]))
2005            formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2])
2006            exception = exception[:2] + (formatted_traceback,)
2007            trace_str = ''.join(exception[2])
2008            result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str)
2009            if __debug__: self._log(
2010                'EXCEPTION: INLINE PROCESSOR: ON OUTPUT BUFFERS ARE EMPTY: {}'.format(result_string))
2011
2012        return False
2013
2014    def _inline_processor__on__connection_lost(self, connection_info: Connection,
2015                                               expected_client: Client):
2016        inline_processor = expected_client.obj_for_inline_processing
2017
2018        if not connection_info.has_inline_processor:
2019            return False
2020
2021        try:
2022            inline_processor.on__connection_lost()
2023        except:
2024            exc = sys.exc_info()
2025            exception = exc
2026            error_str = '{} {}'.format(str(exception[0]), str(exception[1].args[0]))
2027            formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2])
2028            exception = exception[:2] + (formatted_traceback,)
2029            trace_str = ''.join(exception[2])
2030            result_string = '\n\tEXCEPTION:{}\n\tTRACE:{}'.format(error_str, trace_str)
2031            if __debug__: self._log('EXCEPTION: INLINE PROCESSOR: ON CONNECTION LOST: {}'.format(result_string))
2032        self.remove_client(expected_client.id)
2033
2034    def _inline_processor__on__connection_lost_by_connection_id(self, connection_id):
2035        connection_info = self._connections[connection_id]
2036        expected_client_info = connection_info.connected_expected_client
2037        if (expected_client_info is not None) and connection_info.has_inline_processor:
2038            self._inline_processor__on__connection_lost(connection_info, expected_client_info)
2039            self._io_iteration_result.clients_with_disconnected_connection.remove(
2040                expected_client_info.id)
2041
2042    def _unlink_good_af_unix_sockets(self):
2043        if 'posix' == os.name:
2044            for gate_connection_settings in self.gates_connections_settings:
2045                if gate_connection_settings.socket_family == socket.AF_UNIX:
2046                    try:
2047                        os.unlink(gate_connection_settings.socket_address)
2048                    except:
2049                        if __debug__: self._log('EXCEPTION: SERVER END: TRYING TO UNLINK GOOD AF_UNIX GATE: {}'.format(
2050                            gate_connection_settings.socket_address))
2051                        raise
2052
2053    def _check_for_initial_af_unix_socket_unlink(self, connection_settings: ConnectionSettings):
2054        if 'posix' == os.name:
2055            if connection_settings.socket_family == socket.AF_UNIX:
2056                if os.path.exists(connection_settings.socket_address):
2057                    if __debug__: self._log('EXCEPTION: INITIATION: GATE: AF_UNIX SOCKET IS ALREADY EXIST: {}'.format(
2058                        connection_settings.socket_address))
ASockIOCore( gates_connections_settings: typing.Set[cengal.io.asock_io.versions.v_0.base.ConnectionSettings])
293    def __init__(self, gates_connections_settings: Set[ConnectionSettings]):
294        """
295        Port should not be open to a external world!
296        :param gates_connections_settings: set() of ConnectionSettings()
297        :return:
298        """
299        super(ASockIOCore, self).__init__()
300
301        if os.name != 'nt':
302            self.po = select.poll()
303        self.last_all_sockets = set()  # type: Set[int]
304        self.socket_by_fd = dict()  # type: Dict[int, socket.socket]
305
306        self.check_sockets_sum_time = 0.0
307        self.check_sockets_qnt = 0
308        self.check_sockets_max_time = 0.0
309
310        self.gates_connections_settings = gates_connections_settings
311        if not self.gates_connections_settings:
312            self.gates_connections_settings = set()
313            # raise Exception('gates_connections_settings should be provided!')
314        for gates_connections_settings in self.gates_connections_settings:
315            gates_connections_settings.check()
316        self.faulty_connection_settings = set()
317        self._connection_settings_by_gate_conn = dict()
318
319        self.set_of_gate_addresses = set()
320        self._gate = set()
321        self.reuse_gate_addr = False
322        self.reuse_gate_port = False
323
324        self.message_size_len = MESSAGE_SIZE_LEN
325        self.server_answer__keyword_accepted = SERVER_ANSWER__KEYWORD_ACCEPTED
326
327        self._connections = dict()  # key: ID; data: Connection()
328        self._connection_by_conn = dict()  # key: conn; data: ID
329        self._connections_id_gen = IDGenerator()
330
331        self._connections_marked_as_ready_to_be_closed = set()
332        self._connections_marked_to_be_closed_immediately = set()
333        self._connections_marked_as_ready_to_be_deleted = set()
334
335        self._unconfirmed_clients = set()
336
337        self._we_have_connections_for_select = False
338        self._input_check_sockets = set()
339        self._output_check_sockets = set()
340        self._exception_check_sockets = set()
341
342        # ID (GUID) и другая информация клиентов, подключение которых явно ожидается
343        self._expected_clients = dict()  # key: ID; data: Client()
344        self._expected_clients_id_gen = IDGenerator()
345        self._keywords_for_expected_clients = dict()  # key: keyword; data: info
346        self._conns_of_expected_clients = dict()  # key: conn; data expected_client_ID
347
348        self.unexpected_clients_are_allowed = True
349
350        # Список неопознанных и неожидаемых клиентов. Если клиент выдал свой GUID и позже кто-то добавил этот GUID в
351        # список ожидаемых клиентов - данный клиент будет автоматически подхвачен.
352        self._unexpected_clients = dict()  # key: ID; data: Client()
353        self._unexpected_clients_id_gen = IDGenerator()
354        self._keywords_of_unexpected_clients = dict()
355        self._conns_of_unexpected_clients = dict()
356
357        self._io_iteration_result = IoIterationResult()
358
359        self.raw_checker_for_new_incoming_connections = CheckIsRawConnection()
360        self.need_to_auto_check_incoming_raw_connection = False
361        self.unknown_clients_are_allowed = False
362        self._unknown_clients_keyword_gen = IDGenerator(GeneratorType.guid_string)
363        self.prefix_for_unknown_client_keywords = b'UNKNOWN CLIENT: '
364
365        self.echo_log = False
366        self._internal_log = deque()
367
368        self.recv_sizes = deque()
369        self.recv_buff_sizes = deque()
370
371        self.should_get_client_addr_info_on_connection = True
372
373        self.use_nodelay_inet = False
374        self.use_speed_optimized_socket_read = False
375
376        self.show_inform_about_accept_stop_because_of_all_buffers_size_limit = \
377            FrontTriggerableVariable(FrontTriggerableVariableType.equal, True)
378        self.show_inform_about_read_stop_because_of_in_buffer_size_limit = \
379            FrontTriggerableVariable(FrontTriggerableVariableType.equal, True)
380        self.show_inform_about_work_stop_because_of_out_buffer_size_limit = \
381            FrontTriggerableVariable(FrontTriggerableVariableType.equal, True)
382
383        self.class_for_unknown_clients_inline_processing = None
384
385        self._clients_with_inline_processors_that_need_to_apply_parameters = set()

Port should not be open to a external world! :param gates_connections_settings: set() of ConnectionSettings() :return:

last_all_sockets
socket_by_fd
check_sockets_sum_time
check_sockets_qnt
check_sockets_max_time
gates_connections_settings
faulty_connection_settings
set_of_gate_addresses
reuse_gate_addr
reuse_gate_port
message_size_len
server_answer__keyword_accepted
unexpected_clients_are_allowed
raw_checker_for_new_incoming_connections
need_to_auto_check_incoming_raw_connection
unknown_clients_are_allowed
prefix_for_unknown_client_keywords
echo_log
recv_sizes
recv_buff_sizes
should_get_client_addr_info_on_connection
use_nodelay_inet
use_speed_optimized_socket_read
show_inform_about_accept_stop_because_of_all_buffers_size_limit
show_inform_about_read_stop_because_of_in_buffer_size_limit
show_inform_about_work_stop_because_of_out_buffer_size_limit
class_for_unknown_clients_inline_processing
@staticmethod
def check_sockets_select( read: typing.Set[int], write: typing.Set[int], error: typing.Set[int], timeout: float) -> Tuple[Set[int], Set[int], Set[int]]:
387    @staticmethod
388    def check_sockets_select(read: Set[int], write: Set[int], error: Set[int],
389                             timeout: float)->Tuple[Set[int], Set[int], Set[int]]:
390        all_sockets = read | write | error
391        if all_sockets:
392            return select.select(read,
393                                 write,
394                                 error,
395                                 timeout)
396        else:
397            return set(), set(), set()
def check_sockets_poll( self, read: typing.Set[int], write: typing.Set[int], error: typing.Set[int], timeout: float) -> Tuple[Set[int], Set[int], Set[int]]:
399    def check_sockets_poll(self, read: Set[int], write: Set[int], error: Set[int],
400                           timeout: float)->Tuple[Set[int], Set[int], Set[int]]:
401        read_events = select.POLLIN | select.POLLPRI
402        write_events = select.POLLOUT
403        except_events = select.POLLERR | select.POLLHUP | select.POLLNVAL
404        if hasattr(select, 'POLLRDHUP'):
405            except_events |= select.POLLRDHUP
406        readable_events = {select.POLLIN, select.POLLPRI}
407        writable_events = {select.POLLOUT}
408        exceptional_events = {select.POLLERR, select.POLLHUP, select.POLLNVAL}
409        if hasattr(select, 'POLLRDHUP'):
410            exceptional_events.add(select.POLLRDHUP)
411        all_events_set = readable_events | writable_events | exceptional_events
412
413        timeout = int(timeout * 1000)
414
415        # print('>>> POLL {}: last_all_sockets: {}'.format(time.perf_counter(), self.last_all_sockets))
416        all_sockets = read | write | error
417        # print('>>> POLL {}: all_sockets: {}'.format(time.perf_counter(), all_sockets))
418        new_sockets = all_sockets - self.last_all_sockets
419        # print('>>> POLL {}: new_sockets: {}'.format(time.perf_counter(), new_sockets))
420        still_sockets = all_sockets & self.last_all_sockets
421        # print('>>> POLL {}: still_sockets: {}'.format(time.perf_counter(), still_sockets))
422        deleted_sockets = self.last_all_sockets - all_sockets
423        # print('>>> POLL {}: deleted_sockets: {}'.format(time.perf_counter(), deleted_sockets))
424        self.last_all_sockets = all_sockets
425
426        for socket_fd in new_sockets:
427            event_mask = 0
428            if socket_fd in read:
429                event_mask |= read_events
430            if socket_fd in write:
431                event_mask |= write_events
432            if socket_fd in error:
433                event_mask |= except_events
434            # print('>>> POLL {}: new_socket: {}; event_mask: {}'.format(time.perf_counter(), socket_fd, event_mask))
435            self.po.register(socket_fd, event_mask)
436
437        for socket_fd in still_sockets:
438            event_mask = 0
439            if socket_fd in read:
440                event_mask |= read_events
441            if socket_fd in write:
442                event_mask |= write_events
443            if socket_fd in error:
444                event_mask |= except_events
445            # print('>>> POLL {}: still_socket: {}; event_mask: {}'.format(time.perf_counter(), socket_fd, event_mask))
446            self.po.modify(socket_fd, event_mask)
447
448        for socket_fd in deleted_sockets:
449            # print('>>> POLL {}: deleted_socket: {}'.format(time.perf_counter(), socket_fd))
450            self.po.unregister(socket_fd)
451
452        poll_result = self.po.poll(timeout)
453        # print('>>> POLL {}: result: {}'.format(time.perf_counter(), poll_result))
454        # sys.stdout.flush()
455
456        readable = set()
457        writable = set()
458        exceptional = set()
459        for socket_fd, event_mask in poll_result:
460            socket_events_set = set()
461            for another_event in all_events_set:
462                if event_mask & another_event:
463                    socket_events_set.add(another_event)
464
465            if socket_events_set & readable_events:
466                readable.add(socket_fd)
467            if socket_events_set & writable_events:
468                writable.add(socket_fd)
469            if socket_events_set & exceptional_events:
470                exceptional.add(socket_fd)
471
472        return readable, writable, exceptional
def check_sockets( self, read: typing.Set[socket.socket], write: typing.Set[socket.socket], error: typing.Set[socket.socket], timeout: float) -> Tuple[Set[socket.socket], Set[socket.socket], Set[socket.socket]]:
474    def check_sockets(self, read: Set[socket.socket], write: Set[socket.socket], error: Set[socket.socket],
475                      timeout: float)->Tuple[Set[socket.socket], Set[socket.socket], Set[socket.socket]]:
476        all_sockets = read | write | error
477        if all_sockets:
478            read_fd = set()
479            write_fd = set()
480            error_fd = set()
481            for conn in read:
482                read_fd.add(conn.fileno())
483            for conn in write:
484                write_fd.add(conn.fileno())
485            for conn in error:
486                error_fd.add(conn.fileno())
487
488            check_sockets = self.check_sockets_select
489            if os.name != 'nt':
490                check_sockets = self.check_sockets_poll
491
492            readable_fd, writable_fd, exceptional_fd = check_sockets(read_fd,
493                                                                     write_fd,
494                                                                     error_fd,
495                                                                     timeout)
496            readable = set()
497            writable = set()
498            exceptional = set()
499            for fd in readable_fd:
500                readable.add(self.socket_by_fd[fd])
501            for fd in writable_fd:
502                writable.add(self.socket_by_fd[fd])
503            for fd in exceptional_fd:
504                exceptional.add(self.socket_by_fd[fd])
505            return readable, writable, exceptional
506        else:
507            return set(), set(), set()
def gate_io_iteration(self, timeout=0.0):
509    def gate_io_iteration(self, timeout=0.0):
510        result = self._io_iteration_result
511        if self._gate:
512            readable, writable, exceptional = self.check_sockets_select(self._gate,
513                                                                        set(),
514                                                                        set(),
515                                                                        timeout)
516
517            # Handle inputs
518            for s in readable:
519                self._read_data_from_socket(s)
520
521        self._io_iteration_result = IoIterationResult()
522        return result
def io_iteration(self, timeout=0.0):
525    def io_iteration(self, timeout=0.0):
526        """
527
528        :param timeout: timeout in seconds
529        :return:
530        """
531        result = self._io_iteration_result
532
533        if self._we_have_connections_for_select:
534            # need_to_process = False
535            # all_sockets = self._input_check_sockets | self._output_check_sockets | self._exception_check_sockets
536            # if not (all_sockets - self._gate):
537            #     timeout = 0.01
538
539            need_to_repeat = True
540
541            while need_to_repeat:
542                output_check_sockets = set()
543
544                # Is need to check writable sockets
545                need_to_check_writable_sockets = False
546                for s in self._output_check_sockets:
547                    curr_client_info = self._connections[self._connection_by_conn[s]]
548                    if curr_client_info.output_to_client.size():
549                        need_to_check_writable_sockets = True
550                        break
551
552                if need_to_check_writable_sockets:
553                    output_check_sockets = self._output_check_sockets
554
555                # print('>>> POLL {}: ri: {}, wi: {}, ei: {}'.format(time.perf_counter(),
556                #                                                    len(self._input_check_sockets),
557                #                                                    len(self._output_check_sockets),
558                #                                                    len(self._exception_check_sockets)))
559                # sys.stdout.flush()
560                check_sockets_start_time = time.perf_counter()
561                readable, writable, exceptional = self.check_sockets(self._input_check_sockets,
562                                                                     output_check_sockets,
563                                                                     self._exception_check_sockets,
564                                                                     timeout)
565                check_sockets_finish_time = time.perf_counter()
566                check_sockets_delta_time = check_sockets_finish_time - check_sockets_start_time
567                self.check_sockets_sum_time += check_sockets_delta_time
568                self.check_sockets_qnt += 1
569                if self.check_sockets_max_time < check_sockets_delta_time:
570                    self.check_sockets_max_time = check_sockets_delta_time
571                check_socket_average_time = self.check_sockets_sum_time / self.check_sockets_qnt
572                # print('>>> CHECK SOCKET: DELTA {}: AVG: {}; SUM: {}; MAX: {}'.format(
573                #     check_sockets_delta_time,
574                #     check_socket_average_time,
575                #     self.check_sockets_sum_time,
576                #     self.check_sockets_max_time
577                # ))
578                # print('>>> POLL {}: ro: {}, wo: {}, eo: {}'.format(time.perf_counter(),
579                #                                                    len(readable),
580                #                                                    len(writable),
581                #                                                    len(exceptional)))
582                # sys.stdout.flush()
583
584                read_is_forbidden = True
585                if (self.global_in__data_full_size.result - self.global_in__deletable_data_full_size.result) \
586                        <= self.global_in__data_size_limit.result:
587                    read_is_forbidden = False
588
589                    # Handle inputs
590                    for s in readable:
591                        read_result = self._read_data_from_socket(s)
592                        if read_result:
593                            if s in self._unconfirmed_clients:
594                                self._process_client_keyword(s)
595                                self._check_is_client_have_data_to_read_in_fifo(s)
596                            else:
597                                self._client_have_data_to_read_in_fifo(s)
598
599                if __debug__:
600                    read_is_forbidden_test = self.show_inform_about_read_stop_because_of_in_buffer_size_limit.test_trigger(
601                        read_is_forbidden)
602                    if read_is_forbidden_test is not None:
603                        if read_is_forbidden_test:
604                            print('Read is suppressed until data will be processed.')
605                        else:
606                            print('Read is allowed: data is processed.')
607
608                # Handle outputs
609                for s in writable:
610                    curr_client_info = self._connections[self._connection_by_conn[s]]
611                    self._write_data_to_socket(curr_client_info)
612                    # self._write_data_to_socket(s)
613
614                # Handle "exceptional conditions"
615                for s in exceptional:
616                    self._handle_connection_error(s)
617
618                # Set parameters for inline processors
619                if self._clients_with_inline_processors_that_need_to_apply_parameters:
620                    for ec_id in self._clients_with_inline_processors_that_need_to_apply_parameters:
621                        expected_client_info = self._expected_clients[ec_id]
622                        connection_info = expected_client_info._Client__connection
623                        self._inline_processor__apply_parameters(connection_info, expected_client_info)
624                    self._clients_with_inline_processors_that_need_to_apply_parameters.clear()
625
626                # Close sockets
627                if self._connections_marked_to_be_closed_immediately:
628                    sockets_should_be_closed_immediately = self._connections_marked_to_be_closed_immediately
629                    self._connections_marked_to_be_closed_immediately = set()
630                    for closeable_socket in sockets_should_be_closed_immediately:
631                        connection_id = self._connection_by_conn[closeable_socket]
632                        # self.close_connection_by_conn(closeable_socket)
633                        self.close_connection(connection_id)
634                        self._inline_processor__on__connection_lost_by_connection_id(connection_id)
635
636                # Removing clients
637                if self._connections_marked_as_ready_to_be_deleted:
638                    clients_ready_to_be_deleted = self._connections_marked_as_ready_to_be_deleted
639                    self._connections_marked_as_ready_to_be_deleted = set()
640                    for faulty_socket in clients_ready_to_be_deleted:
641                        self.remove_connection_by_conn(faulty_socket)
642
643                if (self.global_out__data_full_size.result - self.global_out__deletable_data_full_size.result) \
644                        <= self.global_out__data_size_limit.result:
645                    need_to_repeat = False
646                else:
647                    need_to_repeat = True
648
649                need_to_repeat = False
650
651                if __debug__:
652                    need_to_repeat_show = self.show_inform_about_work_stop_because_of_out_buffer_size_limit.test_trigger(
653                        need_to_repeat)
654                    if need_to_repeat_show is not None:
655                        if need_to_repeat_show:
656                            print('Work is suppressed until data will be out.')
657                        else:
658                            print('Work is allowed: data is out.')
659
660        self._io_iteration_result = IoIterationResult()
661        return result

:param timeout: timeout in seconds :return:

def listen(self, backlog=1):
663    def listen(self, backlog=1):
664        # backlog = backlog or 1
665
666        new_connection_settings = set()
667        for gate_connection_settings in self.gates_connections_settings:
668            gate = None
669            try:
670                gate = socket.socket(gate_connection_settings.socket_family, gate_connection_settings.socket_type,
671                                     gate_connection_settings.socket_protocol, gate_connection_settings.socket_fileno)
672                self.socket_by_fd[gate.fileno()] = gate
673                gate.setblocking(0)
674                # gate.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
675            except (socket.error, OSError) as err:
676                gate = None
677                if __debug__: self._log('EXCEPTION: GATE: LISTEN: CREATE SOCKET: {}, {}'.format(
678                    err.errno, err.strerror))
679                continue
680
681            if self.reuse_gate_port:
682                gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
683
684            try:
685                self._check_for_initial_af_unix_socket_unlink(gate_connection_settings)
686                gate.bind(gate_connection_settings.socket_address)
687            except (socket.error, OSError) as err:
688                del self.socket_by_fd[gate.fileno()]
689                gate.close()
690                gate = None
691                if __debug__: self._log('EXCEPTION: GATE: BIND:"{}", {}, {}'.format(
692                    gate_connection_settings.socket_address, err.errno, err.strerror))
693                continue
694            try:
695                gate.listen(backlog)
696            except (socket.error, OSError) as err:
697                del self.socket_by_fd[gate.fileno()]
698                gate.close()
699                gate = None
700                if __debug__: self._log('EXCEPTION: GATE: LISTEN:"{}", {}, {}'.format(
701                    gate_connection_settings.socket_address, err.errno, err.strerror))
702                continue
703
704            if self.reuse_gate_addr:
705                gate.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
706
707            self._input_check_sockets.add(gate)
708            self._exception_check_sockets.add(gate)
709
710            if gate:
711                self._gate.add(gate)
712                if gate_connection_settings.socket_family in INET_TYPE_CONNECTIONS:
713                    self.set_of_gate_addresses.add(gate_connection_settings.socket_address[0])
714                elif socket.AF_UNIX == gate_connection_settings.socket_family:
715                    self.set_of_gate_addresses.add(gate_connection_settings.socket_address)
716                else:
717                    self.set_of_gate_addresses.add(gate_connection_settings.socket_address)
718                    self._log('WARNING: GATE: SAVE CONNECTION ADDRESS: UNKNOWN SOCKET FAMILY')
719                self._connection_settings_by_gate_conn[gate] = gate_connection_settings
720                self._we_have_connections_for_select = True
721                new_connection_settings.add(gate_connection_settings)
722            else:
723                self.faulty_connection_settings.add(gate_connection_settings)
724        self.gates_connections_settings = new_connection_settings
725
726        return len(self.gates_connections_settings)
def close_all_connections(self):
728    def close_all_connections(self):
729        if __debug__: self._log('CLOSE ALL CONNECTIONS:')
730        clients_list = dict(self._connections)
731        for connection_id, client_info in clients_list.items():
732            self.close_connection(connection_id)
def remove_all_connections(self):
734    def remove_all_connections(self):
735        clients_list = dict(self._connections)
736        for connection_id, client_info in clients_list.items():
737            self.remove_connection(connection_id)
def close(self):
739    def close(self):
740        for gate in self._gate:
741            del self.socket_by_fd[gate.fileno()]
742            gate.close()
743
744            if gate in self._input_check_sockets:
745                self._input_check_sockets.remove(gate)
746            if gate in self._exception_check_sockets:
747                self._exception_check_sockets.remove(gate)
748
749            if not self._input_check_sockets:
750                self._we_have_connections_for_select = False
751        self._unlink_good_af_unix_sockets()
def destroy(self):
753    def destroy(self):
754        self.close()
755        self.close_all_connections()
756        self.remove_all_connections()
def add_client( self, expected_client_info: Client):
758    def add_client(self, expected_client_info: Client):
759        """
760        Добавляет новый expected client в список. Это может быть как клиент (который сам подключился или подключится в
761        будущем), так и супер-сервер, попытка подключения к которому будет осуществлена тут же - на месте.
762        При этом если произойдет какая-либо ошибка при подключении к супер-серверу - expected client не будет
763        зарегистрирован. Однако client может быть создан. В случае ошибки он будет помечен для закрытия и удаления.
764        Поэтому исключения нужно перехватывать, и после этого проводить как минимум один (как минимум завершающий -
765        перед закрытием и уничтожением сервера) цикл обработки io_iteration().
766        :param expected_client_info: link to Client()
767        :return: expected_client_id
768        """
769        if (expected_client_info.connection_settings.keyword is None) \
770                and (ConnectionDirectionRole.client == expected_client_info.connection_settings.direction_role):
771            raise Exception('Keyword in Client.connection_settings should not be None for a Client connection!')
772
773        if expected_client_info.connection_settings.keyword in self._keywords_for_expected_clients:
774            raise Exception('Expected Client with keyword "{}" is already registered!'.format(
775                expected_client_info.connection_settings.keyword))
776
777        expected_client_info.id = self._expected_clients_id_gen()
778
779        if self.unexpected_clients_are_allowed:
780            if expected_client_info.connection_settings.keyword in self._keywords_of_unexpected_clients:
781                # клиент уже подключен
782                unexpected_client_id = self._keywords_of_unexpected_clients[
783                    expected_client_info.connection_settings.keyword]
784                unexpected_client_info = self._unexpected_clients[unexpected_client_id]
785                connection_info = expected_client_info._Client__connection
786                if (
787                    unexpected_client_info.connection_settings.direction_role == expected_client_info.connection_settings.direction_role) and \
788                        (ConnectionDirectionRole.client == unexpected_client_info.connection_settings.direction_role):
789                    # Произошел запрос на подключение к клиенту, и клиент с таким же ключевым словом уже
790                    # подключен (с сервером этого быть не должно, и может произойти только при неверном ручном изменении
791                    # внутренних данных объекта класса ASockIOCore). Необходимо переиспользовать уже имеющееся
792                    # подключение.
793                    # В случае если все же тут оказался соккет подключенный к супер-серверу - он будет автоматически
794                    # отключен и соединение будет установлено в новом сокете.
795                    expected_client_info.connection_id = unexpected_client_info.connection_id
796                    expected_client_info._Client__connection = \
797                        unexpected_client_info._Client__connection
798                    self._conns_of_expected_clients[connection_info.conn.result] = expected_client_info.id
799                    connection_info.connected_expected_client_id = expected_client_info.id
800                    connection_info.connected_expected_client = expected_client_info
801                    self._io_iteration_result.newly_connected_expected_clients.add(expected_client_info.id)
802                else:
803                    # Произошел запрос на подключение к супер-серверу, но клиент с таким же ключевым словом уже
804                    # подключен (или наоборот). Или же просто имеется установленное соединение с супер-сервером.
805                    # Необходимо его отключить.
806                    self._mark_connection_to_be_closed_immediately(connection_info)
807                    self._mark_connection_as_ready_for_deletion(connection_info)
808                self._remove_unexpected_client(unexpected_client_id)
809            else:
810                # клиент еще не подключен
811                expected_client_info.connection_id = None
812                expected_client_info._Client__connection = None
813
814        if ConnectionDirectionRole.server == expected_client_info.connection_settings.direction_role:
815            self._connect_to_super_server(expected_client_info)
816
817        self._keywords_for_expected_clients[expected_client_info.connection_settings.keyword] = expected_client_info.id
818        self._expected_clients[expected_client_info.id] = expected_client_info
819
820        return expected_client_info.id

Добавляет новый expected client в список. Это может быть как клиент (который сам подключился или подключится в будущем), так и супер-сервер, попытка подключения к которому будет осуществлена тут же - на месте. При этом если произойдет какая-либо ошибка при подключении к супер-серверу - expected client не будет зарегистрирован. Однако client может быть создан. В случае ошибки он будет помечен для закрытия и удаления. Поэтому исключения нужно перехватывать, и после этого проводить как минимум один (как минимум завершающий - перед закрытием и уничтожением сервера) цикл обработки io_iteration(). :param expected_client_info: link to Client() :return: expected_client_id

def get_client_id_by_keyword(self, expected_client_keyword):
822    def get_client_id_by_keyword(self, expected_client_keyword):
823        """
824        :param expected_client_keyword: expected_client_keyword
825        :return: link to Client()
826        """
827        return self._keywords_for_expected_clients[expected_client_keyword]

:param expected_client_keyword: expected_client_keyword :return: link to Client()

def get_client_info(self, expected_client_id):
829    def get_client_info(self, expected_client_id):
830        """
831        :param expected_client_id: expected_client_id
832        :return: link to Client()
833        """
834        return self._expected_clients[expected_client_id]

:param expected_client_id: expected_client_id :return: link to Client()

def get_connection_input_fifo_size_for_client(self, expected_client_id):
836    def get_connection_input_fifo_size_for_client(self, expected_client_id):
837        expected_client_info = self._expected_clients[expected_client_id]
838        connection_info = expected_client_info._Client__connection
839        if connection_info is None:
840            raise Exception('Expected client was not connected yet!')
841        # if client_info.this_is_raw_connection:
842        #     if client_info.input_from_client.size():
843        #         return 1
844        #     else:
845        #         return 0
846        # else:
847        #     return client_info.input_from_client.size()
848        return connection_info.input_from_client.size()
def get_message_from_client(self, expected_client_id):
850    def get_message_from_client(self, expected_client_id):
851        expected_client_info = self._expected_clients[expected_client_id]
852        connection_info = expected_client_info._Client__connection
853        if connection_info is None:
854            raise Exception('Expected client was not connected yet!')
855        if not connection_info.input_from_client.size():
856            raise Exception('There is no readable data in expected client\'s FIFO!')
857        # if client_info.this_is_raw_connection:
858        #     self._consolidate_raw_messages_in_input_from_client_fifo(client_info)
859        return connection_info.input_from_client.get()
def get_messages_from_client(self, expected_client_id):
862    def get_messages_from_client(self, expected_client_id):
863        expected_client_info = self._expected_clients[expected_client_id]
864        connection_info = expected_client_info._Client__connection
865        if connection_info is None:
866            raise Exception('Expected client was not connected yet!')
867        try:
868            while True:
869                yield connection_info.input_from_client.get()
870        except FIFOIsEmpty:
871            pass
872            # while client_info.input_from_client.size():
873            #     yield client_info.input_from_client.get()
def get_connection_output_fifo_size_for_client(self, expected_client_id):
875    def get_connection_output_fifo_size_for_client(self, expected_client_id):
876        expected_client_info = self._expected_clients[expected_client_id]
877        connection_info = expected_client_info._Client__connection
878        if connection_info is None:
879            raise Exception('Expected client was not connected yet!')
880        return connection_info.output_to_client.size()
def send_message_to_client(self, expected_client_id, data):
882    def send_message_to_client(self, expected_client_id, data):
883        # data = bytes(data)
884        expected_client_info = self._expected_clients[expected_client_id]
885        connection_info = expected_client_info._Client__connection
886        if connection_info is None:
887            raise Exception('Expected client was not connected yet!')
888        if connection_info.this_is_raw_connection:
889            self._send_message_through_connection_raw(connection_info, data)
890        else:
891            self._send_message_through_connection(connection_info, data)
def send_messages_to_client(self, expected_client_id, messages_list):
893    def send_messages_to_client(self, expected_client_id, messages_list):
894        # data = bytes(data)
895        expected_client_info = self._expected_clients[expected_client_id]
896        connection_info = expected_client_info._Client__connection
897        if connection_info is None:
898            raise Exception('Expected client was not connected yet!')
899        if connection_info.this_is_raw_connection:
900            self._send_messages_through_connection_raw(connection_info, messages_list)
901        else:
902            self._send_messages_through_connection(connection_info, messages_list)
def check_is_client_is_in_raw_connection_mode(self, expected_client_id):
904    def check_is_client_is_in_raw_connection_mode(self, expected_client_id):
905        expected_client_info = self._expected_clients[expected_client_id]
906        connection_info = expected_client_info._Client__connection
907        if connection_info is None:
908            raise Exception('Expected client was not connected yet!')
909        return connection_info.this_is_raw_connection
def switch_client_raw_connection_mode(self, expected_client_id, is_raw: bool):
911    def switch_client_raw_connection_mode(self, expected_client_id, is_raw: bool):
912        expected_client_info = self._expected_clients[expected_client_id]
913        connection_info = expected_client_info._Client__connection
914        if connection_info is None:
915            raise Exception('Expected client was not connected yet!')
916        connection_info.this_is_raw_connection = is_raw
def set_inline_processor_for_client( self, expected_client_id, class_for_unknown_clients_inline_processing: type):
918    def set_inline_processor_for_client(self, expected_client_id,
919                                        class_for_unknown_clients_inline_processing: type):
920        expected_client_info = self._expected_clients[expected_client_id]
921        connection_info = expected_client_info._Client__connection
922        if connection_info is None:
923            raise Exception('Expected client was not connected yet!')
924        self._set_inline_processor_for_client(connection_info, expected_client_info,
925                                              class_for_unknown_clients_inline_processing)
def close_client_connection(self, expected_client_id, raise_if_already_closed=True):
927    def close_client_connection(self, expected_client_id, raise_if_already_closed=True):
928        """
929        Connection will be closed immediately (inside this method)
930        :param expected_client_id:
931        :param raise_if_already_closed:
932        :return:
933        """
934        if __debug__: self._log('CLOSE EXPECTED CLIENT SOCKET:')
935        expected_client_info = self._expected_clients[expected_client_id]
936        connection_info = expected_client_info._Client__connection
937        if raise_if_already_closed and (connection_info is None):
938            raise Exception('Expected client was not connected yet!')
939        self.close_connection(connection_info.id)

Connection will be closed immediately (inside this method) :param expected_client_id: :param raise_if_already_closed: :return:

def mark_client_connection_as_should_be_closed_immediately(self, expected_client_id, raise_if_already_closed=True):
941    def mark_client_connection_as_should_be_closed_immediately(self, expected_client_id,
942                                                               raise_if_already_closed=True):
943        """
944        Connection will be closed immediately (inside main IO loop)
945        :param expected_client_id:
946        :param raise_if_already_closed:
947        :return:
948        """
949        if __debug__: self._log('MARK EXPECTED CLIENT SOCKET AS SHOULD BE CLOSED IMMEDIATELY:')
950        expected_client_info = self._expected_clients[expected_client_id]
951        connection_info = expected_client_info._Client__connection
952        if raise_if_already_closed and (connection_info is None):
953            raise Exception('Expected client was not connected yet!')
954        self._mark_connection_to_be_closed_immediately(connection_info)

Connection will be closed immediately (inside main IO loop) :param expected_client_id: :param raise_if_already_closed: :return:

def mark_client_connection_as_ready_to_be_closed(self, expected_client_id, raise_if_already_closed=True):
956    def mark_client_connection_as_ready_to_be_closed(self, expected_client_id, raise_if_already_closed=True):
957        """
958        Connection will be closed when all output will be sent (inside main IO loop).
959        :param expected_client_id:
960        :param raise_if_already_closed:
961        :return:
962        """
963        if __debug__: self._log('MARK EXPECTED CLIENT SOCKET AS READY TO BE CLOSED:')
964        expected_client_info = self._expected_clients[expected_client_id]
965        connection_info = expected_client_info._Client__connection
966        if raise_if_already_closed and (connection_info is None):
967            raise Exception('Expected client was not connected yet!')
968        self._mark_connection_as_ready_to_be_closed(connection_info)

Connection will be closed when all output will be sent (inside main IO loop). :param expected_client_id: :param raise_if_already_closed: :return:

def remove_client(self, expected_client_id):
970    def remove_client(self, expected_client_id):
971        if __debug__: self._log('REMOVE EXPECTED CLIENT: {}'.format(expected_client_id))
972        expected_client_info = self._expected_clients[expected_client_id]
973        if __debug__: self._log('\tWITH KEYWORD: {}'.format(expected_client_info.connection_settings.keyword))
974        connection_id = expected_client_info.connection_id
975        if connection_id is None:
976            self._remove_client(expected_client_id)
977        self.remove_connection(connection_id)
def add_connection(self, conn, address):
 979    def add_connection(self, conn, address):
 980        """
 981        :param conn: socket
 982        :param address: address
 983        :return: client ID
 984        """
 985        if conn is None:
 986            raise TypeError('conn should not be None!')
 987
 988        self.socket_by_fd[conn.fileno()] = conn
 989        conn.setblocking(0)
 990        if self.use_nodelay_inet and (conn.family in INET_TYPE_CONNECTIONS):
 991            conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 992
 993        new_client_id = self._connections_id_gen()
 994
 995        client_info = Connection(new_client_id, (conn, address), self)
 996        self._connections[new_client_id] = client_info
 997        self._connection_by_conn[conn] = new_client_id
 998        self._input_check_sockets.add(conn)
 999        self._exception_check_sockets.add(conn)
1000        self._we_have_connections_for_select = True
1001
1002        self._unconfirmed_clients.add(conn)
1003
1004        return new_client_id

:param conn: socket :param address: address :return: client ID

def check_connection_existance(self, connection_id):
1006    def check_connection_existance(self, connection_id):
1007        if connection_id not in self._expected_clients:
1008            return False
1009        if connection_id not in self._connections:
1010            return False
1011        client_info = self._connections[connection_id]
1012        if not client_info.conn.existence:
1013            return False
1014        conn = client_info.conn.result
1015        if conn is None:
1016            return False
1017        return True
def close_connection(self, connection_id):
1019    def close_connection(self, connection_id):
1020        if __debug__: self._log('CLOSE CLIENT {}:'.format(connection_id))
1021        client_info = self._connections[connection_id]
1022        if not client_info.conn.existence:
1023            if __debug__: self._log('CLIENT {} CONN IS NOT SET.'.format(connection_id))
1024            return
1025        conn = client_info.conn.result
1026        if conn is None:
1027            if __debug__: self._log('CLIENT {} CONN IS NONE.'.format(connection_id))
1028            return
1029
1030        del self.socket_by_fd[conn.fileno()]
1031        conn.close()
1032        client_info.conn.existence = False
1033        client_info.output_to_client = copy.copy(client_info.output_to_client)  # clear all output data to free some
1034        #   memory even before destroying
1035
1036        if connection_id in self._connections_marked_as_ready_to_be_closed:
1037            self._connections_marked_as_ready_to_be_closed.remove(connection_id)
1038        if conn in self._connections_marked_to_be_closed_immediately:
1039            self._connections_marked_to_be_closed_immediately.remove(conn)
1040        if conn in self._connection_by_conn:
1041            del self._connection_by_conn[conn]
1042        if conn in self._input_check_sockets:
1043            self._input_check_sockets.remove(conn)
1044        if conn in self._output_check_sockets:
1045            self._output_check_sockets.remove(conn)
1046        if conn in self._exception_check_sockets:
1047            self._exception_check_sockets.remove(conn)
1048
1049        if not self._input_check_sockets:
1050            self._we_have_connections_for_select = False
1051
1052        if __debug__: self._log('CLIENT {} NORMALLY CLOSED.'.format(connection_id))
def close_connection_by_conn(self, conn):
1054    def close_connection_by_conn(self, conn):
1055        # Если conn не в списке - вылетет ошибка. Это предотвратит ошибочное закрытие незарегистрированного сокета.
1056        # И мы сможем обнаружить наличие соответствующей ошибки в коде.
1057        if __debug__: self._log('CLOSE CLIENT BY CONN: {}'.format(repr(conn)))
1058        connection_id = self._connection_by_conn[conn]
1059        if __debug__: self._log('\t WITH CLIENT ID: {}'.format(connection_id))
1060        self.close_connection(connection_id)
def remove_connection(self, connection_id):
1062    def remove_connection(self, connection_id):
1063        # client should NOT be removed immediately after connection close (close_connection):
1064        # code should do it by itself after reading all available input data
1065        if __debug__: self._log('REMOVE CLIENT: {}'.format(connection_id))
1066        client_info = self._connections[connection_id]
1067        if __debug__: self._log('\tWITH KEYWORD: {}'.format(client_info.keyword))
1068        if client_info.conn.existence:
1069            self.close_connection(connection_id)
1070        conn = client_info.conn.result
1071        if conn is None:
1072            return
1073
1074        if conn in self._conns_of_unexpected_clients:
1075            self._remove_unexpected_client(self._conns_of_unexpected_clients[conn])
1076
1077        # if conn in self._conns_of_expected_clients:
1078        #     self._remove_client(self._conns_of_expected_clients[conn])
1079        if client_info.connected_expected_client_id is not None:
1080            self._remove_client(client_info.connected_expected_client_id)
1081
1082        client_info.connected_expected_client_id = None
1083        client_info.connected_expected_client = None
1084
1085        if connection_id in self._connections_marked_as_ready_to_be_deleted:
1086            self._connections_marked_as_ready_to_be_deleted.remove(connection_id)
1087
1088        client_info.conn.existence = False
1089        client_info.conn.result = None
1090
1091        del self._connections[connection_id]
1092        if conn in self._connection_by_conn:
1093            del self._connection_by_conn[conn]
1094        if conn in self._input_check_sockets:
1095            self._input_check_sockets.remove(conn)
1096        if conn in self._output_check_sockets:
1097            self._output_check_sockets.remove(conn)
1098        if conn in self._exception_check_sockets:
1099            self._exception_check_sockets.remove(conn)
1100
1101            # client_info.remove()
def remove_connection_by_conn(self, conn):
1103    def remove_connection_by_conn(self, conn):
1104        connection_id = self._connection_by_conn[conn]
1105        self.remove_connection(connection_id)
Inherited Members
cengal.io.asock_io.versions.v_0.base.IOCoreMemoryManagement
global__data_size_limit
global_in__data_size_limit
global_in__data_full_size
global_in__deletable_data_full_size
global_out__data_size_limit
global_out__data_full_size
global_out__deletable_data_full_size
class ThereAreNoGateConections(builtins.Exception):
2060class ThereAreNoGateConections(Exception):
2061    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class NotEnoughGateConnections(builtins.Exception):
2063class NotEnoughGateConnections(Exception):
2064    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
@contextmanager
def asock_io_core_connect( asock_io_core_obj: ASockIOCore, should_have_gate_connections: bool = False, backlog: int = 1, should_have_all_desired_gate_connections: bool = False):
2066@contextmanager
2067def asock_io_core_connect(asock_io_core_obj: ASockIOCore, should_have_gate_connections: bool=False, backlog: int=1,
2068                          should_have_all_desired_gate_connections: bool=False):
2069    try:
2070        desired_amount_of_gate_connections = len(asock_io_core_obj.gates_connections_settings)
2071        gate_connections_num = asock_io_core_obj.listen(backlog)
2072        if should_have_gate_connections and (not gate_connections_num):
2073            error_text = 'ERROR: CONTEXTMANAGER: BASIC INITIATION: THERE IS NO GOOD GATE CONNECTIONS!'
2074            asock_io_core_obj._log(error_text)
2075            raise ThereAreNoGateConections(error_text)
2076        else:
2077            if should_have_all_desired_gate_connections:
2078                if desired_amount_of_gate_connections != gate_connections_num:
2079                    error_text = 'ERROR: CONTEXTMANAGER: BASIC INITIATION: NOT ENOUGH GOOD GATE CONNECTIONS!'
2080                    asock_io_core_obj._log(error_text)
2081                    raise NotEnoughGateConnections(error_text)
2082            print('THERE ARE CREATED {} GOOD GATE CONNECTIONS'.format(gate_connections_num))
2083        yield asock_io_core_obj
2084    except:
2085        raise
2086    finally:
2087        asock_io_core_obj.io_iteration()
2088        asock_io_core_obj.destroy()
2089        asock_io_core_obj.io_iteration()
2090        if __debug__: print('RECV BUFF SIZES: {}'.format(str(asock_io_core_obj.recv_buff_sizes)[:150]))
2091        if __debug__: print('RECV SIZES: {}'.format(str(asock_io_core_obj.recv_sizes)[:150]))
class CheckIsRawConnection:
2094class CheckIsRawConnection:
2095    def __call__(self, asock_io_core: ASockIOCore, connection_info: Connection)->bool:
2096        """
2097        :param asock_io_core:
2098        :param connection_info:
2099        :return: "True" if it is RAW connection for Unknow Client. "False" otherwise.
2100        """
2101        result = False
2102        try:
2103            if connection_info.conn.result.family in {socket.AF_INET, socket.AF_INET6}:
2104                if connection_info.addr.result[0] not in asock_io_core.set_of_gate_addresses:
2105                    # If connected not from local IP address
2106                    result = True
2107        except:
2108            pass
2109        return result