cengal.io.used_ports.versions.v_0.used_ports

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18"""
 19Module Docstring
 20Docstrings: http://www.python.org/dev/peps/pep-0257/
 21"""
 22
 23__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 24__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 25__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 26__license__ = "Apache License, Version 2.0"
 27__version__ = "4.4.1"
 28__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 29__email__ = "gtalk@butenkoms.space"
 30# __status__ = "Prototype"
 31__status__ = "Development"
 32# __status__ = "Production"
 33
 34
 35__all__ = ['PortStatus', 'Table', 'Protocol', 'purify_ports', 'get_tables', 'used_ports', 'PortLord', 'PortInfo', 'UsedPorts', 'PortsIterator', 'unify_ports', 'UnaceptablePortsRangeTypeError']
 36
 37import pickle
 38from enum import Enum
 39from typing import Dict, Optional, Set, List, Tuple, Union, Any
 40from collections import namedtuple
 41from cengal.file_system.path_manager import path_relative_to_src
 42
 43
 44class PortStatus(str, Enum):
 45    na = 'N/A'                 # In programming APIs (not in communication between hosts), requests a system-allocated (dynamic) port
 46    yes = 'Yes'                # Described protocol is assigned for this port by IANA and is standardized, specified, or widely used for such.
 47    unofficial = 'Unofficial'  # Described protocol is not assigned for this port by IANA but is standardized, specified, or widely used for such.
 48    assigned = 'Assigned'      # Described protocol is assigned by IANA for this port, but is not standardized, specified, or widely used for such.
 49    no = 'No'                  # Described protocol is not assigned by IANA, standardized, specified, or widely used for the port.
 50    reserved = 'Reserved'      # Port is reserved by IANA, generally to prevent collision having its previous use removed. The port number may be available for assignment upon request to IANA.
 51
 52
 53class Table(str, Enum):
 54    system = 'system'
 55    user = 'user'
 56    ephemeral = 'ephemeral'
 57    
 58
 59class Protocol(Enum):
 60    tcp = 0
 61    udp = 1
 62    sctp = 2
 63    dccp = 3
 64
 65
 66_tables: Dict[Table, List[List]] = None
 67_known_tables_names = {'system', 'user', 'ephemeral'}
 68_used_ports = None
 69
 70
 71def _purify_item(item: Optional[str]) -> PortStatus:
 72    if item is None:
 73        return PortStatus.no
 74    
 75    return PortStatus(item)
 76
 77
 78def get_tables() -> Dict[Table, List[List]]:
 79    global _tables
 80    if _tables is None:
 81        tables: Dict[Table, List[List]] = dict()
 82        for table_name in _known_tables_names:
 83            with open(path_relative_to_src(f'data/table_{table_name}.pickle'), 'rb') as file:
 84                tables[Table(table_name)] = pickle.load(file)
 85
 86        _tables = tables
 87    
 88    return _tables
 89
 90
 91def used_ports() -> 'UsedPorts':
 92    global _used_ports
 93    if _used_ports is None:
 94        _used_ports = UsedPorts()
 95    
 96    return _used_ports
 97
 98
 99PortLord = namedtuple('PortLord', 'tcp udp sctp dccp description')
100
101
102class PortInfo:
103    def __init__(self, port: int, port_lords: Optional[Set[PortLord]] = None) -> None:
104        self.port: int = port
105        self.port_lords = port_lords or set()
106    
107    def add(self, port_lord: PortLord) -> int:
108        self.port_lords.add(port_lord)
109        return len(self.port_lords)
110
111
112class UsedPorts:
113    def __init__(self) -> None:
114        self.table: List[PortInfo] = list([PortInfo(i) for i in range(65536)])
115        self.tables = {
116            Table.system: slice(0, 1024),
117            Table.user: slice(1024, 49152),
118            Table.ephemeral: slice(49152, 65536),
119        }
120        raw_tables: Dict[Table, List[List]] = get_tables()
121        for table_name, raw_table in raw_tables.items():
122            for row in raw_table:
123                port_or_range, tcp, udp, sctp, dccp, description = row
124                port_lord = PortLord(_purify_item(tcp), _purify_item(udp), _purify_item(sctp), _purify_item(dccp), description)
125                if isinstance(port_or_range, tuple):
126                    for port in range(int(port_or_range[0]), int(port_or_range[1]) + 1):
127                        self.table[port].add(port_lord)
128                else:
129                    self.table[int(port_or_range)].add(port_lord)
130        
131        not_used_port = PortLord(PortStatus.no, PortStatus.no, PortStatus.no, PortStatus.no, str())
132        for port_info in self.table:
133            if not port_info.port_lords:
134                port_info.port_lords.add(not_used_port)
135    
136    def port(self, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]]) -> 'PortsIterator':
137        return PortsIterator(self, protocol, port_or_range, statuses, 1)
138    
139    def range(self, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]], desired_number_of_ports: int) -> 'PortsIterator':
140        return PortsIterator(self, protocol, port_or_range, statuses, desired_number_of_ports)
141    
142    def __call__(self, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]], desired_number_of_ports: int) -> 'PortsIterator':
143        return self.range(protocol, port_or_range, statuses, desired_number_of_ports)
144
145
146class UnaceptablePortsRangeTypeError(Exception):
147    pass
148
149
150class PortsIterator:
151    def __init__(self, used_ports: UsedPorts, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]], desired_number_of_ports: int) -> None:
152        self.used_ports: UsedPorts = used_ports
153        self.protocol: Protocol = protocol
154        self.ports_range: slice = None
155        if isinstance(port_or_range, int):
156            self.ports_range = slice(port_or_range, port_or_range + 1)
157        elif isinstance(port_or_range, slice):
158            self.ports_range = port_or_range
159        elif isinstance(port_or_range, tuple):
160            self.ports_range = slice(*port_or_range)
161        else:
162            raise UnaceptablePortsRangeTypeError
163        
164        if isinstance(statuses, PortStatus):
165            statuses = {statuses}
166        
167        self.statuses: Union[PortStatus, Set[PortStatus]] = statuses
168        self.desired_number_of_ports: int = desired_number_of_ports
169        self.first_port = self.ports_range.start
170
171    def __iter__(self):
172        self.first_port = self.ports_range.start
173        return self
174
175    def __next__(self):
176        ports_range = self._iter_impl()
177        self.first_port += 1
178        return ports_range
179    
180    def _iter_impl(self) -> slice:
181        if self.first_port >= self.ports_range.stop:
182            raise StopIteration
183        
184        while self.first_port < self.ports_range.stop:
185            result_range_stop = self.first_port + 1
186            for offset in range(self.desired_number_of_ports):
187                port = self.first_port + offset
188                
189                port_info: PortInfo = self.used_ports.table[port]
190                port_statuses: Set[PortStatus] = set()
191                for port_lord in port_info.port_lords:
192                    port_statuses.add(port_lord[self.protocol.value])
193                
194                if 0 == len(port_statuses - self.statuses):
195                    is_good = True
196                else:
197                    is_good = False
198                
199                if is_good:
200                    result_range_stop = port + 1
201                else:
202                    result_range_stop = None
203                    self.first_port = port + 1
204                    break
205            
206            if result_range_stop is not None:
207                return slice(self.first_port, result_range_stop)
208        
209        raise StopIteration
210    
211    def __call__(self) -> Optional[slice]:
212        try:
213            return self._iter_impl()
214        except StopIteration:
215            return None
216
217    def shift_only(self, num: int = 1) -> None:
218        self.first_port += num
219
220    def shift(self, num: int = 1) -> Optional[slice]:
221        self.first_port += num
222        return self()
223
224    def put_only(self, num: int) -> None:
225        self.first_port = num
226
227    def put(self, num: int) -> Optional[slice]:
228        self.first_port = num
229        return self()
230
231
232def purify_ports(ports_range: Optional[slice]) -> Optional[Union[int, Tuple[int, int]]]:
233    start = ports_range.start
234    stop = ports_range.stop - 1
235    if start == stop:
236        return start
237    else:
238        return start, stop
239
240
241def unify_ports(ports_range: Optional[slice]) -> Optional[Tuple[int, int]]:
242    start = ports_range.start
243    stop = ports_range.stop - 1
244    return start, stop
245    
class PortStatus(builtins.str, enum.Enum):
45class PortStatus(str, Enum):
46    na = 'N/A'                 # In programming APIs (not in communication between hosts), requests a system-allocated (dynamic) port
47    yes = 'Yes'                # Described protocol is assigned for this port by IANA and is standardized, specified, or widely used for such.
48    unofficial = 'Unofficial'  # Described protocol is not assigned for this port by IANA but is standardized, specified, or widely used for such.
49    assigned = 'Assigned'      # Described protocol is assigned by IANA for this port, but is not standardized, specified, or widely used for such.
50    no = 'No'                  # Described protocol is not assigned by IANA, standardized, specified, or widely used for the port.
51    reserved = 'Reserved'      # Port is reserved by IANA, generally to prevent collision having its previous use removed. The port number may be available for assignment upon request to IANA.

An enumeration.

na = <PortStatus.na: 'N/A'>
yes = <PortStatus.yes: 'Yes'>
unofficial = <PortStatus.unofficial: 'Unofficial'>
assigned = <PortStatus.assigned: 'Assigned'>
no = <PortStatus.no: 'No'>
reserved = <PortStatus.reserved: 'Reserved'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class Table(builtins.str, enum.Enum):
54class Table(str, Enum):
55    system = 'system'
56    user = 'user'
57    ephemeral = 'ephemeral'

An enumeration.

system = <Table.system: 'system'>
user = <Table.user: 'user'>
ephemeral = <Table.ephemeral: 'ephemeral'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class Protocol(enum.Enum):
60class Protocol(Enum):
61    tcp = 0
62    udp = 1
63    sctp = 2
64    dccp = 3

An enumeration.

tcp = <Protocol.tcp: 0>
udp = <Protocol.udp: 1>
sctp = <Protocol.sctp: 2>
dccp = <Protocol.dccp: 3>
Inherited Members
enum.Enum
name
value
def purify_ports( ports_range: typing.Union[slice, NoneType]) -> Union[int, Tuple[int, int], NoneType]:
233def purify_ports(ports_range: Optional[slice]) -> Optional[Union[int, Tuple[int, int]]]:
234    start = ports_range.start
235    stop = ports_range.stop - 1
236    if start == stop:
237        return start
238    else:
239        return start, stop
def get_tables() -> Dict[Table, List[List]]:
79def get_tables() -> Dict[Table, List[List]]:
80    global _tables
81    if _tables is None:
82        tables: Dict[Table, List[List]] = dict()
83        for table_name in _known_tables_names:
84            with open(path_relative_to_src(f'data/table_{table_name}.pickle'), 'rb') as file:
85                tables[Table(table_name)] = pickle.load(file)
86
87        _tables = tables
88    
89    return _tables
def used_ports() -> UsedPorts:
92def used_ports() -> 'UsedPorts':
93    global _used_ports
94    if _used_ports is None:
95        _used_ports = UsedPorts()
96    
97    return _used_ports
class PortLord(builtins.tuple):

PortLord(tcp, udp, sctp, dccp, description)

PortLord(tcp, udp, sctp, dccp, description)

Create new instance of PortLord(tcp, udp, sctp, dccp, description)

tcp

Alias for field number 0

udp

Alias for field number 1

sctp

Alias for field number 2

dccp

Alias for field number 3

description

Alias for field number 4

Inherited Members
builtins.tuple
index
count
class PortInfo:
103class PortInfo:
104    def __init__(self, port: int, port_lords: Optional[Set[PortLord]] = None) -> None:
105        self.port: int = port
106        self.port_lords = port_lords or set()
107    
108    def add(self, port_lord: PortLord) -> int:
109        self.port_lords.add(port_lord)
110        return len(self.port_lords)
PortInfo( port: int, port_lords: typing.Union[typing.Set[PortLord], NoneType] = None)
104    def __init__(self, port: int, port_lords: Optional[Set[PortLord]] = None) -> None:
105        self.port: int = port
106        self.port_lords = port_lords or set()
port: int
port_lords
def add( self, port_lord: PortLord) -> int:
108    def add(self, port_lord: PortLord) -> int:
109        self.port_lords.add(port_lord)
110        return len(self.port_lords)
class UsedPorts:
113class UsedPorts:
114    def __init__(self) -> None:
115        self.table: List[PortInfo] = list([PortInfo(i) for i in range(65536)])
116        self.tables = {
117            Table.system: slice(0, 1024),
118            Table.user: slice(1024, 49152),
119            Table.ephemeral: slice(49152, 65536),
120        }
121        raw_tables: Dict[Table, List[List]] = get_tables()
122        for table_name, raw_table in raw_tables.items():
123            for row in raw_table:
124                port_or_range, tcp, udp, sctp, dccp, description = row
125                port_lord = PortLord(_purify_item(tcp), _purify_item(udp), _purify_item(sctp), _purify_item(dccp), description)
126                if isinstance(port_or_range, tuple):
127                    for port in range(int(port_or_range[0]), int(port_or_range[1]) + 1):
128                        self.table[port].add(port_lord)
129                else:
130                    self.table[int(port_or_range)].add(port_lord)
131        
132        not_used_port = PortLord(PortStatus.no, PortStatus.no, PortStatus.no, PortStatus.no, str())
133        for port_info in self.table:
134            if not port_info.port_lords:
135                port_info.port_lords.add(not_used_port)
136    
137    def port(self, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]]) -> 'PortsIterator':
138        return PortsIterator(self, protocol, port_or_range, statuses, 1)
139    
140    def range(self, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]], desired_number_of_ports: int) -> 'PortsIterator':
141        return PortsIterator(self, protocol, port_or_range, statuses, desired_number_of_ports)
142    
143    def __call__(self, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]], desired_number_of_ports: int) -> 'PortsIterator':
144        return self.range(protocol, port_or_range, statuses, desired_number_of_ports)
table: List[PortInfo]
tables
def port( self, protocol: Protocol, port_or_range: typing.Union[int, slice, typing.Tuple[int, int]], statuses: typing.Union[PortStatus, typing.Set[PortStatus]]) -> PortsIterator:
137    def port(self, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]]) -> 'PortsIterator':
138        return PortsIterator(self, protocol, port_or_range, statuses, 1)
def range( self, protocol: Protocol, port_or_range: typing.Union[int, slice, typing.Tuple[int, int]], statuses: typing.Union[PortStatus, typing.Set[PortStatus]], desired_number_of_ports: int) -> PortsIterator:
140    def range(self, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]], desired_number_of_ports: int) -> 'PortsIterator':
141        return PortsIterator(self, protocol, port_or_range, statuses, desired_number_of_ports)
class PortsIterator:
151class PortsIterator:
152    def __init__(self, used_ports: UsedPorts, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]], desired_number_of_ports: int) -> None:
153        self.used_ports: UsedPorts = used_ports
154        self.protocol: Protocol = protocol
155        self.ports_range: slice = None
156        if isinstance(port_or_range, int):
157            self.ports_range = slice(port_or_range, port_or_range + 1)
158        elif isinstance(port_or_range, slice):
159            self.ports_range = port_or_range
160        elif isinstance(port_or_range, tuple):
161            self.ports_range = slice(*port_or_range)
162        else:
163            raise UnaceptablePortsRangeTypeError
164        
165        if isinstance(statuses, PortStatus):
166            statuses = {statuses}
167        
168        self.statuses: Union[PortStatus, Set[PortStatus]] = statuses
169        self.desired_number_of_ports: int = desired_number_of_ports
170        self.first_port = self.ports_range.start
171
172    def __iter__(self):
173        self.first_port = self.ports_range.start
174        return self
175
176    def __next__(self):
177        ports_range = self._iter_impl()
178        self.first_port += 1
179        return ports_range
180    
181    def _iter_impl(self) -> slice:
182        if self.first_port >= self.ports_range.stop:
183            raise StopIteration
184        
185        while self.first_port < self.ports_range.stop:
186            result_range_stop = self.first_port + 1
187            for offset in range(self.desired_number_of_ports):
188                port = self.first_port + offset
189                
190                port_info: PortInfo = self.used_ports.table[port]
191                port_statuses: Set[PortStatus] = set()
192                for port_lord in port_info.port_lords:
193                    port_statuses.add(port_lord[self.protocol.value])
194                
195                if 0 == len(port_statuses - self.statuses):
196                    is_good = True
197                else:
198                    is_good = False
199                
200                if is_good:
201                    result_range_stop = port + 1
202                else:
203                    result_range_stop = None
204                    self.first_port = port + 1
205                    break
206            
207            if result_range_stop is not None:
208                return slice(self.first_port, result_range_stop)
209        
210        raise StopIteration
211    
212    def __call__(self) -> Optional[slice]:
213        try:
214            return self._iter_impl()
215        except StopIteration:
216            return None
217
218    def shift_only(self, num: int = 1) -> None:
219        self.first_port += num
220
221    def shift(self, num: int = 1) -> Optional[slice]:
222        self.first_port += num
223        return self()
224
225    def put_only(self, num: int) -> None:
226        self.first_port = num
227
228    def put(self, num: int) -> Optional[slice]:
229        self.first_port = num
230        return self()
PortsIterator( used_ports: UsedPorts, protocol: Protocol, port_or_range: typing.Union[int, slice, typing.Tuple[int, int]], statuses: typing.Union[PortStatus, typing.Set[PortStatus]], desired_number_of_ports: int)
152    def __init__(self, used_ports: UsedPorts, protocol: Protocol, port_or_range: Union[int, slice, Tuple[int, int]], statuses: Union[PortStatus, Set[PortStatus]], desired_number_of_ports: int) -> None:
153        self.used_ports: UsedPorts = used_ports
154        self.protocol: Protocol = protocol
155        self.ports_range: slice = None
156        if isinstance(port_or_range, int):
157            self.ports_range = slice(port_or_range, port_or_range + 1)
158        elif isinstance(port_or_range, slice):
159            self.ports_range = port_or_range
160        elif isinstance(port_or_range, tuple):
161            self.ports_range = slice(*port_or_range)
162        else:
163            raise UnaceptablePortsRangeTypeError
164        
165        if isinstance(statuses, PortStatus):
166            statuses = {statuses}
167        
168        self.statuses: Union[PortStatus, Set[PortStatus]] = statuses
169        self.desired_number_of_ports: int = desired_number_of_ports
170        self.first_port = self.ports_range.start
used_ports: UsedPorts
protocol: Protocol
ports_range: slice
statuses: Union[PortStatus, Set[PortStatus]]
desired_number_of_ports: int
first_port
def shift_only(self, num: int = 1) -> None:
218    def shift_only(self, num: int = 1) -> None:
219        self.first_port += num
def shift(self, num: int = 1) -> Union[slice, NoneType]:
221    def shift(self, num: int = 1) -> Optional[slice]:
222        self.first_port += num
223        return self()
def put_only(self, num: int) -> None:
225    def put_only(self, num: int) -> None:
226        self.first_port = num
def put(self, num: int) -> Union[slice, NoneType]:
228    def put(self, num: int) -> Optional[slice]:
229        self.first_port = num
230        return self()
def unify_ports( ports_range: typing.Union[slice, NoneType]) -> Union[Tuple[int, int], NoneType]:
242def unify_ports(ports_range: Optional[slice]) -> Optional[Tuple[int, int]]:
243    start = ports_range.start
244    stop = ports_range.stop - 1
245    return start, stop
class UnaceptablePortsRangeTypeError(builtins.Exception):
147class UnaceptablePortsRangeTypeError(Exception):
148    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args