cengal.io.used_ports.versions.v_0.used_ports
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18""" 19Module Docstring 20Docstrings: http://www.python.org/dev/peps/pep-0257/ 21""" 22 23__author__ = "ButenkoMS <gtalk@butenkoms.space>" 24__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 25__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 26__license__ = "Apache License, Version 2.0" 27__version__ = "4.4.1" 28__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 29__email__ = "gtalk@butenkoms.space" 30# __status__ = "Prototype" 31__status__ = "Development" 32# __status__ = "Production" 33 34 35__all__ = ['PortStatus', 'Table', 'Protocol', 'purify_ports', 'get_tables', 'used_ports', 'PortLord', 'PortInfo', 'UsedPorts', 'PortsIterator', 'unify_ports', 'UnaceptablePortsRangeTypeError'] 36 37import pickle 38from enum import Enum 39from typing import Dict, Optional, Set, List, Tuple, Union, Any 40from collections import namedtuple 41from cengal.file_system.path_manager import path_relative_to_src 42 43 44class PortStatus(str, Enum): 45 na = 'N/A' # In programming APIs (not in communication between hosts), requests a system-allocated (dynamic) port 46 yes = 'Yes' # Described protocol is assigned for this port by IANA and is standardized, specified, or widely used for such. 47 unofficial = 'Unofficial' # Described protocol is not assigned for this port by IANA but is standardized, specified, or widely used for such. 48 assigned = 'Assigned' # Described protocol is assigned by IANA for this port, but is not standardized, specified, or widely used for such. 49 no = 'No' # Described protocol is not assigned by IANA, standardized, specified, or widely used for the port. 50 reserved = 'Reserved' # Port is reserved by IANA, generally to prevent collision having its previous use removed. The port number may be available for assignment upon request to IANA. 51 52 53class Table(str, Enum): 54 system = 'system' 55 user = 'user' 56 ephemeral = 'ephemeral' 57 58 59class Protocol(Enum): 60 tcp = 0 61 udp = 1 62 sctp = 2 63 dccp = 3 64 65 66_tables: Dict[Table, List[List]] = None 67_known_tables_names = {'system', 'user', 'ephemeral'} 68_used_ports = None 69 70 71def _purify_item(item: Optional[str]) -> PortStatus: 72 if item is None: 73 return PortStatus.no 74 75 return PortStatus(item) 76 77 78def get_tables() -> Dict[Table, List[List]]: 79 global _tables 80 if _tables is None: 81 tables: Dict[Table, List[List]] = dict() 82 for table_name in _known_tables_names: 83 with open(path_relative_to_src(f'data/table_{table_name}.pickle'), 'rb') as file: 84 tables[Table(table_name)] = pickle.load(file) 85 86 _tables = tables 87 88 return _tables 89 90 91def used_ports() -> 'UsedPorts': 92 global _used_ports 93 if _used_ports is None: 94 _used_ports = UsedPorts() 95 96 return _used_ports 97 98 99PortLord = namedtuple('PortLord', 'tcp udp sctp dccp description') 100 101 102class PortInfo: 103 def __init__(self, port: int, port_lords: Optional[Set[PortLord]] = None) -> None: 104 self.port: int = port 105 self.port_lords = port_lords or set() 106 107 def add(self, port_lord: PortLord) -> int: 108 self.port_lords.add(port_lord) 109 return len(self.port_lords) 110 111 112class UsedPorts: 113 def __init__(self) -> None: 114 self.table: List[PortInfo] = list([PortInfo(i) for i in range(65536)]) 115 self.tables = { 116 Table.system: slice(0, 1024), 117 Table.user: slice(1024, 49152), 118 Table.ephemeral: slice(49152, 65536), 119 } 120 raw_tables: Dict[Table, List[List]] = get_tables() 121 for table_name, raw_table in raw_tables.items(): 122 for row in raw_table: 123 port_or_range, tcp, udp, sctp, dccp, description = row 124 port_lord = PortLord(_purify_item(tcp), _purify_item(udp), _purify_item(sctp), _purify_item(dccp), description) 125 if isinstance(port_or_range, tuple): 126 for port in range(int(port_or_range[0]), int(port_or_range[1]) + 1): 127 self.table[port].add(port_lord) 128 else: 129 self.table[int(port_or_range)].add(port_lord) 130 131 not_used_port = PortLord(PortStatus.no, PortStatus.no, PortStatus.no, PortStatus.no, str()) 132 for port_info in self.table: 133 if not port_info.port_lords: 134 port_info.port_lords.add(not_used_port) 135 136 def port(self, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]]) -> 'PortsIterator': 137 return PortsIterator(self, protocol, port_or_range, statuses, 1) 138 139 def range(self, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]], desired_number_of_ports: int) -> 'PortsIterator': 140 return PortsIterator(self, protocol, port_or_range, statuses, desired_number_of_ports) 141 142 def __call__(self, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]], desired_number_of_ports: int) -> 'PortsIterator': 143 return self.range(protocol, port_or_range, statuses, desired_number_of_ports) 144 145 146class UnaceptablePortsRangeTypeError(Exception): 147 pass 148 149 150class PortsIterator: 151 def __init__(self, used_ports: UsedPorts, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]], desired_number_of_ports: int) -> None: 152 self.used_ports: UsedPorts = used_ports 153 self.protocol: Protocol = protocol 154 self.ports_range: slice = None 155 if isinstance(port_or_range, int): 156 self.ports_range = slice(port_or_range, port_or_range + 1) 157 elif isinstance(port_or_range, slice): 158 self.ports_range = port_or_range 159 elif isinstance(port_or_range, tuple): 160 self.ports_range = slice(*port_or_range) 161 else: 162 raise UnaceptablePortsRangeTypeError 163 164 if isinstance(statuses, PortStatus): 165 statuses = {statuses} 166 167 self.statuses: Union[PortStatus, Set[PortStatus]] = statuses 168 self.desired_number_of_ports: int = desired_number_of_ports 169 self.first_port = self.ports_range.start 170 171 def __iter__(self): 172 self.first_port = self.ports_range.start 173 return self 174 175 def __next__(self): 176 ports_range = self._iter_impl() 177 self.first_port += 1 178 return ports_range 179 180 def _iter_impl(self) -> slice: 181 if self.first_port >= self.ports_range.stop: 182 raise StopIteration 183 184 while self.first_port < self.ports_range.stop: 185 result_range_stop = self.first_port + 1 186 for offset in range(self.desired_number_of_ports): 187 port = self.first_port + offset 188 189 port_info: PortInfo = self.used_ports.table[port] 190 port_statuses: Set[PortStatus] = set() 191 for port_lord in port_info.port_lords: 192 port_statuses.add(port_lord[self.protocol.value]) 193 194 if 0 == len(port_statuses - self.statuses): 195 is_good = True 196 else: 197 is_good = False 198 199 if is_good: 200 result_range_stop = port + 1 201 else: 202 result_range_stop = None 203 self.first_port = port + 1 204 break 205 206 if result_range_stop is not None: 207 return slice(self.first_port, result_range_stop) 208 209 raise StopIteration 210 211 def __call__(self) -> Optional[slice]: 212 try: 213 return self._iter_impl() 214 except StopIteration: 215 return None 216 217 def shift_only(self, num: int = 1) -> None: 218 self.first_port += num 219 220 def shift(self, num: int = 1) -> Optional[slice]: 221 self.first_port += num 222 return self() 223 224 def put_only(self, num: int) -> None: 225 self.first_port = num 226 227 def put(self, num: int) -> Optional[slice]: 228 self.first_port = num 229 return self() 230 231 232def purify_ports(ports_range: Optional[slice]) -> Optional[Union[int, Tuple[int, int]]]: 233 start = ports_range.start 234 stop = ports_range.stop - 1 235 if start == stop: 236 return start 237 else: 238 return start, stop 239 240 241def unify_ports(ports_range: Optional[slice]) -> Optional[Tuple[int, int]]: 242 start = ports_range.start 243 stop = ports_range.stop - 1 244 return start, stop 245
class
PortStatus(builtins.str, enum.Enum):
45class PortStatus(str, Enum): 46 na = 'N/A' # In programming APIs (not in communication between hosts), requests a system-allocated (dynamic) port 47 yes = 'Yes' # Described protocol is assigned for this port by IANA and is standardized, specified, or widely used for such. 48 unofficial = 'Unofficial' # Described protocol is not assigned for this port by IANA but is standardized, specified, or widely used for such. 49 assigned = 'Assigned' # Described protocol is assigned by IANA for this port, but is not standardized, specified, or widely used for such. 50 no = 'No' # Described protocol is not assigned by IANA, standardized, specified, or widely used for the port. 51 reserved = 'Reserved' # Port is reserved by IANA, generally to prevent collision having its previous use removed. The port number may be available for assignment upon request to IANA.
An enumeration.
na =
<PortStatus.na: 'N/A'>
yes =
<PortStatus.yes: 'Yes'>
unofficial =
<PortStatus.unofficial: 'Unofficial'>
assigned =
<PortStatus.assigned: 'Assigned'>
no =
<PortStatus.no: 'No'>
reserved =
<PortStatus.reserved: 'Reserved'>
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
class
Table(builtins.str, enum.Enum):
An enumeration.
system =
<Table.system: 'system'>
user =
<Table.user: 'user'>
ephemeral =
<Table.ephemeral: 'ephemeral'>
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
class
Protocol(enum.Enum):
An enumeration.
tcp =
<Protocol.tcp: 0>
udp =
<Protocol.udp: 1>
sctp =
<Protocol.sctp: 2>
dccp =
<Protocol.dccp: 3>
Inherited Members
- enum.Enum
- name
- value
def
purify_ports( ports_range: typing.Union[slice, NoneType]) -> Union[int, Tuple[int, int], NoneType]:
79def get_tables() -> Dict[Table, List[List]]: 80 global _tables 81 if _tables is None: 82 tables: Dict[Table, List[List]] = dict() 83 for table_name in _known_tables_names: 84 with open(path_relative_to_src(f'data/table_{table_name}.pickle'), 'rb') as file: 85 tables[Table(table_name)] = pickle.load(file) 86 87 _tables = tables 88 89 return _tables
class
PortLord(builtins.tuple):
PortLord(tcp, udp, sctp, dccp, description)
PortLord(tcp, udp, sctp, dccp, description)
Create new instance of PortLord(tcp, udp, sctp, dccp, description)
Inherited Members
- builtins.tuple
- index
- count
class
PortInfo:
103class PortInfo: 104 def __init__(self, port: int, port_lords: Optional[Set[PortLord]] = None) -> None: 105 self.port: int = port 106 self.port_lords = port_lords or set() 107 108 def add(self, port_lord: PortLord) -> int: 109 self.port_lords.add(port_lord) 110 return len(self.port_lords)
PortInfo( port: int, port_lords: typing.Union[typing.Set[PortLord], NoneType] = None)
class
UsedPorts:
113class UsedPorts: 114 def __init__(self) -> None: 115 self.table: List[PortInfo] = list([PortInfo(i) for i in range(65536)]) 116 self.tables = { 117 Table.system: slice(0, 1024), 118 Table.user: slice(1024, 49152), 119 Table.ephemeral: slice(49152, 65536), 120 } 121 raw_tables: Dict[Table, List[List]] = get_tables() 122 for table_name, raw_table in raw_tables.items(): 123 for row in raw_table: 124 port_or_range, tcp, udp, sctp, dccp, description = row 125 port_lord = PortLord(_purify_item(tcp), _purify_item(udp), _purify_item(sctp), _purify_item(dccp), description) 126 if isinstance(port_or_range, tuple): 127 for port in range(int(port_or_range[0]), int(port_or_range[1]) + 1): 128 self.table[port].add(port_lord) 129 else: 130 self.table[int(port_or_range)].add(port_lord) 131 132 not_used_port = PortLord(PortStatus.no, PortStatus.no, PortStatus.no, PortStatus.no, str()) 133 for port_info in self.table: 134 if not port_info.port_lords: 135 port_info.port_lords.add(not_used_port) 136 137 def port(self, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]]) -> 'PortsIterator': 138 return PortsIterator(self, protocol, port_or_range, statuses, 1) 139 140 def range(self, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]], desired_number_of_ports: int) -> 'PortsIterator': 141 return PortsIterator(self, protocol, port_or_range, statuses, desired_number_of_ports) 142 143 def __call__(self, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]], desired_number_of_ports: int) -> 'PortsIterator': 144 return self.range(protocol, port_or_range, statuses, desired_number_of_ports)
table: List[PortInfo]
def
port( self, protocol: Protocol, port_or_range: typing.Union[int, slice, typing.Tuple[int, int]], statuses: typing.Union[PortStatus, typing.Set[PortStatus]]) -> PortsIterator:
def
range( self, protocol: Protocol, port_or_range: typing.Union[int, slice, typing.Tuple[int, int]], statuses: typing.Union[PortStatus, typing.Set[PortStatus]], desired_number_of_ports: int) -> PortsIterator:
class
PortsIterator:
151class PortsIterator: 152 def __init__(self, used_ports: UsedPorts, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]], desired_number_of_ports: int) -> None: 153 self.used_ports: UsedPorts = used_ports 154 self.protocol: Protocol = protocol 155 self.ports_range: slice = None 156 if isinstance(port_or_range, int): 157 self.ports_range = slice(port_or_range, port_or_range + 1) 158 elif isinstance(port_or_range, slice): 159 self.ports_range = port_or_range 160 elif isinstance(port_or_range, tuple): 161 self.ports_range = slice(*port_or_range) 162 else: 163 raise UnaceptablePortsRangeTypeError 164 165 if isinstance(statuses, PortStatus): 166 statuses = {statuses} 167 168 self.statuses: Union[PortStatus, Set[PortStatus]] = statuses 169 self.desired_number_of_ports: int = desired_number_of_ports 170 self.first_port = self.ports_range.start 171 172 def __iter__(self): 173 self.first_port = self.ports_range.start 174 return self 175 176 def __next__(self): 177 ports_range = self._iter_impl() 178 self.first_port += 1 179 return ports_range 180 181 def _iter_impl(self) -> slice: 182 if self.first_port >= self.ports_range.stop: 183 raise StopIteration 184 185 while self.first_port < self.ports_range.stop: 186 result_range_stop = self.first_port + 1 187 for offset in range(self.desired_number_of_ports): 188 port = self.first_port + offset 189 190 port_info: PortInfo = self.used_ports.table[port] 191 port_statuses: Set[PortStatus] = set() 192 for port_lord in port_info.port_lords: 193 port_statuses.add(port_lord[self.protocol.value]) 194 195 if 0 == len(port_statuses - self.statuses): 196 is_good = True 197 else: 198 is_good = False 199 200 if is_good: 201 result_range_stop = port + 1 202 else: 203 result_range_stop = None 204 self.first_port = port + 1 205 break 206 207 if result_range_stop is not None: 208 return slice(self.first_port, result_range_stop) 209 210 raise StopIteration 211 212 def __call__(self) -> Optional[slice]: 213 try: 214 return self._iter_impl() 215 except StopIteration: 216 return None 217 218 def shift_only(self, num: int = 1) -> None: 219 self.first_port += num 220 221 def shift(self, num: int = 1) -> Optional[slice]: 222 self.first_port += num 223 return self() 224 225 def put_only(self, num: int) -> None: 226 self.first_port = num 227 228 def put(self, num: int) -> Optional[slice]: 229 self.first_port = num 230 return self()
PortsIterator( used_ports: UsedPorts, protocol: Protocol, port_or_range: typing.Union[int, slice, typing.Tuple[int, int]], statuses: typing.Union[PortStatus, typing.Set[PortStatus]], desired_number_of_ports: int)
152 def __init__(self, used_ports: UsedPorts, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]], desired_number_of_ports: int) -> None: 153 self.used_ports: UsedPorts = used_ports 154 self.protocol: Protocol = protocol 155 self.ports_range: slice = None 156 if isinstance(port_or_range, int): 157 self.ports_range = slice(port_or_range, port_or_range + 1) 158 elif isinstance(port_or_range, slice): 159 self.ports_range = port_or_range 160 elif isinstance(port_or_range, tuple): 161 self.ports_range = slice(*port_or_range) 162 else: 163 raise UnaceptablePortsRangeTypeError 164 165 if isinstance(statuses, PortStatus): 166 statuses = {statuses} 167 168 self.statuses: Union[PortStatus, Set[PortStatus]] = statuses 169 self.desired_number_of_ports: int = desired_number_of_ports 170 self.first_port = self.ports_range.start
used_ports: UsedPorts
protocol: Protocol
statuses: Union[PortStatus, Set[PortStatus]]
def
unify_ports( ports_range: typing.Union[slice, NoneType]) -> Union[Tuple[int, int], NoneType]:
class
UnaceptablePortsRangeTypeError(builtins.Exception):
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args