cengal.web_tools.help_tools.versions.v_0.help_tools
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19import platform 20if 'PyPy' != platform.python_implementation(): 21 import requests 22import binascii 23import os, os.path 24import pickle 25import datetime 26# try to import C parser then fallback in pure python parser. 27try: 28 from http_parser.parser import HttpParser 29except ImportError: 30 from http_parser.pyparser import HttpParser 31 32from cengal.modules_management.alternative_import import alt_import 33 34with alt_import('lzma') as lzma: 35 if lzma is None: 36 import lzmaffi.compat 37 lzmaffi.compat.register() 38 import lzma 39 40 41""" 42Module Docstring 43Docstrings: http://www.python.org/dev/peps/pep-0257/ 44""" 45 46__author__ = "ButenkoMS <gtalk@butenkoms.space>" 47__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 48__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 49__license__ = "Apache License, Version 2.0" 50__version__ = "4.4.1" 51__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 52__email__ = "gtalk@butenkoms.space" 53# __status__ = "Prototype" 54__status__ = "Development" 55# __status__ = "Production" 56 57 58def remove_percent_encoding_from_the_URI(string, plus=True): 59 try: 60 string = str(string) 61 if plus: 62 string = string.replace('+', ' ') 63 string = string.encode(encoding='utf-8') 64 percentTypes = ((b'%u', 6, (False, None)), (b'%', 3, (True, tuple(range(32)) + (34, 42, 58, 60, 62, 63, 124, 127)))) 65 # (prefix, full size, (allowed type or not at all, list of disallowed characters like backspace etc.)) 66 for percType in percentTypes: 67 isDone = False 68 while not isDone: 69 index = string.find(percType[0]) 70 if index > -1: 71 ind2 = index+percType[1] 72 if ind2 > len(string): 73 ind2 = len(string) 74 hexString = string[index:ind2] 75 hexString = hexString[len(percType[0]):] 76 decodedString = b'' 77 if percType[2][0]: 78 try: 79 decodedStringBuff = binascii.unhexlify(hexString) 80 if int.from_bytes(decodedStringBuff, byteorder='little') not in percType[2][1]: 81 decodedString = decodedStringBuff 82 except binascii.Error: 83 pass 84 string = string[:index] + decodedString + string[ind2:] 85 else: 86 isDone = True 87 if percType[2][0]: 88 if (len(string) > 0) and (len(set(string).intersection(set(percType[2][1]))) > 0): 89 index = 0 90 isDone = False 91 while not isDone: 92 if string[index] in percType[2][1]: 93 string = string[:index] + string[index+1:] 94 else: 95 index += 1 96 if index >= len(string): 97 isDone = True 98 string = string.decode(encoding='utf-8') 99 except UnicodeDecodeError: 100 string = None 101 except UnicodeEncodeError: 102 string = None 103 return string 104 105 106def get_standard_folder_separator(): 107 return '/' 108 109 110def unify_folder_separators(string): 111 string = str(string) 112 string = string.replace('\\', get_standard_folder_separator()) 113 return string 114 115 116def remove_forbidden_file_names_from_the_URI(string): 117 # replace forbidden file names with slash ('/'). Also will unify folder separators (replace '\' with '/') 118 forbiddenFileNames = {'con', 'prn', 'aux', 'nul', 'com1', 'com2', 'com3', 'com4', 'com5', 'com6', 'com7', 'com8' 119 , 'com9', 'lpt1', 'lpt2', 'lpt3', 'lpt4', 'lpt5', 'lpt6', 'lpt7', 'lpt8', 'lpt9'} 120 string = unify_folder_separators(str(string)) 121 strBuffer = string 122 string = string.lower() 123 for forbFile in forbiddenFileNames: 124 string = string.replace(get_standard_folder_separator() + forbFile + get_standard_folder_separator(), '/') 125 string = string.replace(get_standard_folder_separator() + forbFile + '.', '/') 126 isStrWasChanged = False 127 if strBuffer.lower() != string: 128 isStrWasChanged = True 129 else: 130 string = strBuffer 131 result = (string, isStrWasChanged) 132 return result 133 134 135def remove_path_to_the_parent_of_the_current_directory(string): 136 string = unify_folder_separators(str(string)) 137 string = string.replace('../', '/') 138 string = string.replace('/..', '/') 139 return string 140 141 142def is_path_is_trying_to_leave_site_sandbox(string): 143 itemsList = string.split(get_standard_folder_separator()) 144 while '' in itemsList: 145 itemsList.remove('') 146 counter = 0 147 for item in itemsList: 148 if item == '..': 149 counter -= 1 150 elif item == '.': 151 pass 152 else: 153 counter += 1 154 if counter < 0: 155 return True 156 return False 157 158 159def is_path_is_not_safe(string): 160 # return tuple (decoded string, is_path_is_not_safe) 161 result = tuple() 162 string = remove_percent_encoding_from_the_URI(string) 163 if string is None: 164 result = (string, True) 165 return result 166 string = remove_forbidden_file_names_from_the_URI(string) 167 if string[1]: 168 result = (string[0], True) 169 return result 170 is_not_safe = is_path_is_trying_to_leave_site_sandbox(string[0]) 171 result = (string[0], is_not_safe) 172 return result 173 174 175def web_server__is_redirection_to_the_main_domain_needed(httpParser: HttpParser, prefix=None): 176 functionResult = False 177 178 host = httpParser.get_headers()['Host'] 179 domain = host 180 if ':' in host: 181 host = host.split(':') 182 while '' in host: 183 host.remove('') 184 domain = host[0] 185 if '.' in domain: 186 domain = domain.split('.') 187 if domain[0] == '': 188 del domain[0] 189 functionResult = True 190 if prefix is not None: 191 if domain[0] != prefix: 192 domain.insert(0, prefix) 193 functionResult = True 194 lastDLen = len(domain) - 1 195 if domain[lastDLen] == '': 196 del domain[lastDLen] 197 functionResult = True 198 domain = '.'.join(domain) 199 if host.__class__ is list: 200 if len(host) > 1: 201 host[0] = domain 202 domain = ':'.join(host) 203 204 functionResult = (functionResult, domain) 205 return functionResult 206 207 208def load_cache_from_file(fileName, originalCache): 209 fileName = unify_folder_separators(fileName) 210 cache = originalCache 211 if os.path.exists(fileName): 212 if os.path.isfile(fileName): 213 cacheDataFile = None 214 dataBuff = originalCache 215 try: 216 # cacheDataFile = open(fileName, 'rb') 217 cacheDataFile = lzma.open(fileName, 'rb') 218 cache = pickle.load(cacheDataFile) 219 dataBuff.update(cache) 220 cache = dataBuff 221 except(EnvironmentError, pickle.PicklingError) as err: 222 cache = dataBuff 223 finally: 224 if cacheDataFile is not None: 225 cacheDataFile.close() 226 return cache 227 228 229def save_cache_to_file(fileName, cache): 230 fileName = unify_folder_separators(fileName) 231 if cache.isWasChanged: 232 isRenamedToBak = False 233 if os.path.exists(fileName): 234 try: 235 if os.path.exists(fileName+'.bak'): 236 try: 237 os.remove(fileName+'.bak') 238 except IOError: 239 pass 240 os.rename(fileName, fileName+'.bak') 241 isRenamedToBak = True 242 except IOError: 243 pass 244 245 isDumped = False 246 cacheDataFile = None 247 try: 248 # cacheDataFile = open(fileName, 'wb') 249 cacheDataFile = lzma.open(fileName, 'wb', format=lzma.FORMAT_XZ, check=lzma.CHECK_CRC64, preset=1) 250 pickle.dump(cache, cacheDataFile) 251 cache.isWasChanged = False 252 isDumped = True 253 except(EnvironmentError, pickle.PicklingError) as err: 254 pass 255 finally: 256 if cacheDataFile is not None: 257 cacheDataFile.close() 258 259 if isRenamedToBak and isDumped: 260 try: 261 os.remove(fileName+'.bak') 262 except IOError: 263 pass 264 265 266class CachedRequestError(Exception): pass 267 268 269def make_cached_request__get(cache, request, useOnlyCachedResults=False): 270 result = cache.try_to_get_data_for_request(request) 271 if (result is None) or (not result.ok): 272 if useOnlyCachedResults: 273 raise CachedRequestError() 274 result = requests.get(unify_folder_separators(request)) 275 result = SerializableHttpResponseFromRequests(result) 276 cache.put_new_request(request, result) 277 278 return result 279 280 281def make_cached_request__universal(cache, request, workerFunction, useOnlyCachedResults=False, **workerFunctionParams): 282 result = cache.try_to_get_data_for_request(request) 283 if result is None: 284 if useOnlyCachedResults: 285 raise CachedRequestError() 286 result = workerFunction(**workerFunctionParams) 287 cache.put_new_request(request, result) 288 289 return result 290 291 292def make_a_copy_of_cached_request_with_another_keyname(cache, current_name, new_name, make_first_copy_only=False): 293 is_copy_avaliable = True 294 if make_first_copy_only: 295 test = cache.try_to_get_data_for_request(current_name) 296 if test is not None: 297 is_copy_avaliable = False 298 if is_copy_avaliable: 299 result = cache.try_to_get_data_for_request(current_name) 300 if result is not None: 301 cache.put_new_request(new_name, result) 302 303 304class SerializableHttpResponseFromRequests: 305 def __init__(self, request, isBinaryFile=False): 306 super(SerializableHttpResponseFromRequests, self).__init__() 307 self.isBinaryFile = isBinaryFile 308 self.url = request.url 309 self.ok = request.ok 310 self.reason = request.reason 311 self.status_code = request.status_code 312 self.headers = request.headers 313 self.encoding = request.encoding 314 315 if 'content' in dir(request): 316 self.content = request.content 317 else: 318 self.content = None 319 320 # if not isBinaryFile: 321 # if 'apparent_encoding' in dir(request): 322 # self.apparent_encoding = request.apparent_encoding 323 # else: 324 # self.apparent_encoding = None 325 # else: 326 # self.apparent_encoding = None 327 self.apparent_encoding = None 328 329 # if (self.encoding is not None) or (self.apparent_encoding is not None): 330 # self.text = request.text 331 # else: 332 # self.text = None 333 self.text = None 334 335 def getResultEncoding(self): 336 if self.encoding is not None: 337 return self.encoding 338 elif self.apparent_encoding is not None: 339 return self.apparent_encoding 340 else: 341 return 'utf-8' 342 343 def getBytesContent(self): 344 if self.content is not None: 345 return self.content 346 elif self.text is not None: 347 bData = bytes(self.text, self.getResultEncoding()) 348 return bData 349 else: 350 print("Can't get data from the request object on url ", self.url) 351 raise Exception 352 353 354def saveRequestedFileToFS(folderName, requestResult): 355 fileName = folderName + os.path.basename(requestResult.url) 356 fileName = unify_folder_separators(fileName) 357 with open(fileName, 'wb') as file: 358 if requestResult.content is not None: 359 file.write(requestResult.content) 360 elif requestResult.text is not None: 361 bData = bytes(requestResult.text, requestResult.getResultEncoding()) 362 file.write(bData) 363 else: 364 ex_text = "Can't save object from {} to the fie. Can't get data to save".format(requestResult.url) 365 print(ex_text) 366 raise Exception(ex_text) 367 368 369def getFileModificationDate(fileName): 370 fileName = unify_folder_separators(fileName) 371 t = os.path.getmtime(fileName) 372 return datetime.datetime.fromtimestamp(t)
def
remove_percent_encoding_from_the_URI(string, plus=True):
59def remove_percent_encoding_from_the_URI(string, plus=True): 60 try: 61 string = str(string) 62 if plus: 63 string = string.replace('+', ' ') 64 string = string.encode(encoding='utf-8') 65 percentTypes = ((b'%u', 6, (False, None)), (b'%', 3, (True, tuple(range(32)) + (34, 42, 58, 60, 62, 63, 124, 127)))) 66 # (prefix, full size, (allowed type or not at all, list of disallowed characters like backspace etc.)) 67 for percType in percentTypes: 68 isDone = False 69 while not isDone: 70 index = string.find(percType[0]) 71 if index > -1: 72 ind2 = index+percType[1] 73 if ind2 > len(string): 74 ind2 = len(string) 75 hexString = string[index:ind2] 76 hexString = hexString[len(percType[0]):] 77 decodedString = b'' 78 if percType[2][0]: 79 try: 80 decodedStringBuff = binascii.unhexlify(hexString) 81 if int.from_bytes(decodedStringBuff, byteorder='little') not in percType[2][1]: 82 decodedString = decodedStringBuff 83 except binascii.Error: 84 pass 85 string = string[:index] + decodedString + string[ind2:] 86 else: 87 isDone = True 88 if percType[2][0]: 89 if (len(string) > 0) and (len(set(string).intersection(set(percType[2][1]))) > 0): 90 index = 0 91 isDone = False 92 while not isDone: 93 if string[index] in percType[2][1]: 94 string = string[:index] + string[index+1:] 95 else: 96 index += 1 97 if index >= len(string): 98 isDone = True 99 string = string.decode(encoding='utf-8') 100 except UnicodeDecodeError: 101 string = None 102 except UnicodeEncodeError: 103 string = None 104 return string
def
get_standard_folder_separator():
def
unify_folder_separators(string):
def
remove_forbidden_file_names_from_the_URI(string):
117def remove_forbidden_file_names_from_the_URI(string): 118 # replace forbidden file names with slash ('/'). Also will unify folder separators (replace '\' with '/') 119 forbiddenFileNames = {'con', 'prn', 'aux', 'nul', 'com1', 'com2', 'com3', 'com4', 'com5', 'com6', 'com7', 'com8' 120 , 'com9', 'lpt1', 'lpt2', 'lpt3', 'lpt4', 'lpt5', 'lpt6', 'lpt7', 'lpt8', 'lpt9'} 121 string = unify_folder_separators(str(string)) 122 strBuffer = string 123 string = string.lower() 124 for forbFile in forbiddenFileNames: 125 string = string.replace(get_standard_folder_separator() + forbFile + get_standard_folder_separator(), '/') 126 string = string.replace(get_standard_folder_separator() + forbFile + '.', '/') 127 isStrWasChanged = False 128 if strBuffer.lower() != string: 129 isStrWasChanged = True 130 else: 131 string = strBuffer 132 result = (string, isStrWasChanged) 133 return result
def
remove_path_to_the_parent_of_the_current_directory(string):
def
is_path_is_trying_to_leave_site_sandbox(string):
143def is_path_is_trying_to_leave_site_sandbox(string): 144 itemsList = string.split(get_standard_folder_separator()) 145 while '' in itemsList: 146 itemsList.remove('') 147 counter = 0 148 for item in itemsList: 149 if item == '..': 150 counter -= 1 151 elif item == '.': 152 pass 153 else: 154 counter += 1 155 if counter < 0: 156 return True 157 return False
def
is_path_is_not_safe(string):
160def is_path_is_not_safe(string): 161 # return tuple (decoded string, is_path_is_not_safe) 162 result = tuple() 163 string = remove_percent_encoding_from_the_URI(string) 164 if string is None: 165 result = (string, True) 166 return result 167 string = remove_forbidden_file_names_from_the_URI(string) 168 if string[1]: 169 result = (string[0], True) 170 return result 171 is_not_safe = is_path_is_trying_to_leave_site_sandbox(string[0]) 172 result = (string[0], is_not_safe) 173 return result
def
web_server__is_redirection_to_the_main_domain_needed(httpParser: http_parser.parser.HttpParser, prefix=None):
176def web_server__is_redirection_to_the_main_domain_needed(httpParser: HttpParser, prefix=None): 177 functionResult = False 178 179 host = httpParser.get_headers()['Host'] 180 domain = host 181 if ':' in host: 182 host = host.split(':') 183 while '' in host: 184 host.remove('') 185 domain = host[0] 186 if '.' in domain: 187 domain = domain.split('.') 188 if domain[0] == '': 189 del domain[0] 190 functionResult = True 191 if prefix is not None: 192 if domain[0] != prefix: 193 domain.insert(0, prefix) 194 functionResult = True 195 lastDLen = len(domain) - 1 196 if domain[lastDLen] == '': 197 del domain[lastDLen] 198 functionResult = True 199 domain = '.'.join(domain) 200 if host.__class__ is list: 201 if len(host) > 1: 202 host[0] = domain 203 domain = ':'.join(host) 204 205 functionResult = (functionResult, domain) 206 return functionResult
def
load_cache_from_file(fileName, originalCache):
209def load_cache_from_file(fileName, originalCache): 210 fileName = unify_folder_separators(fileName) 211 cache = originalCache 212 if os.path.exists(fileName): 213 if os.path.isfile(fileName): 214 cacheDataFile = None 215 dataBuff = originalCache 216 try: 217 # cacheDataFile = open(fileName, 'rb') 218 cacheDataFile = lzma.open(fileName, 'rb') 219 cache = pickle.load(cacheDataFile) 220 dataBuff.update(cache) 221 cache = dataBuff 222 except(EnvironmentError, pickle.PicklingError) as err: 223 cache = dataBuff 224 finally: 225 if cacheDataFile is not None: 226 cacheDataFile.close() 227 return cache
def
save_cache_to_file(fileName, cache):
230def save_cache_to_file(fileName, cache): 231 fileName = unify_folder_separators(fileName) 232 if cache.isWasChanged: 233 isRenamedToBak = False 234 if os.path.exists(fileName): 235 try: 236 if os.path.exists(fileName+'.bak'): 237 try: 238 os.remove(fileName+'.bak') 239 except IOError: 240 pass 241 os.rename(fileName, fileName+'.bak') 242 isRenamedToBak = True 243 except IOError: 244 pass 245 246 isDumped = False 247 cacheDataFile = None 248 try: 249 # cacheDataFile = open(fileName, 'wb') 250 cacheDataFile = lzma.open(fileName, 'wb', format=lzma.FORMAT_XZ, check=lzma.CHECK_CRC64, preset=1) 251 pickle.dump(cache, cacheDataFile) 252 cache.isWasChanged = False 253 isDumped = True 254 except(EnvironmentError, pickle.PicklingError) as err: 255 pass 256 finally: 257 if cacheDataFile is not None: 258 cacheDataFile.close() 259 260 if isRenamedToBak and isDumped: 261 try: 262 os.remove(fileName+'.bak') 263 except IOError: 264 pass
class
CachedRequestError(builtins.Exception):
267class CachedRequestError(Exception): pass
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
def
make_cached_request__get(cache, request, useOnlyCachedResults=False):
270def make_cached_request__get(cache, request, useOnlyCachedResults=False): 271 result = cache.try_to_get_data_for_request(request) 272 if (result is None) or (not result.ok): 273 if useOnlyCachedResults: 274 raise CachedRequestError() 275 result = requests.get(unify_folder_separators(request)) 276 result = SerializableHttpResponseFromRequests(result) 277 cache.put_new_request(request, result) 278 279 return result
def
make_cached_request__universal( cache, request, workerFunction, useOnlyCachedResults=False, **workerFunctionParams):
282def make_cached_request__universal(cache, request, workerFunction, useOnlyCachedResults=False, **workerFunctionParams): 283 result = cache.try_to_get_data_for_request(request) 284 if result is None: 285 if useOnlyCachedResults: 286 raise CachedRequestError() 287 result = workerFunction(**workerFunctionParams) 288 cache.put_new_request(request, result) 289 290 return result
def
make_a_copy_of_cached_request_with_another_keyname(cache, current_name, new_name, make_first_copy_only=False):
293def make_a_copy_of_cached_request_with_another_keyname(cache, current_name, new_name, make_first_copy_only=False): 294 is_copy_avaliable = True 295 if make_first_copy_only: 296 test = cache.try_to_get_data_for_request(current_name) 297 if test is not None: 298 is_copy_avaliable = False 299 if is_copy_avaliable: 300 result = cache.try_to_get_data_for_request(current_name) 301 if result is not None: 302 cache.put_new_request(new_name, result)
class
SerializableHttpResponseFromRequests:
305class SerializableHttpResponseFromRequests: 306 def __init__(self, request, isBinaryFile=False): 307 super(SerializableHttpResponseFromRequests, self).__init__() 308 self.isBinaryFile = isBinaryFile 309 self.url = request.url 310 self.ok = request.ok 311 self.reason = request.reason 312 self.status_code = request.status_code 313 self.headers = request.headers 314 self.encoding = request.encoding 315 316 if 'content' in dir(request): 317 self.content = request.content 318 else: 319 self.content = None 320 321 # if not isBinaryFile: 322 # if 'apparent_encoding' in dir(request): 323 # self.apparent_encoding = request.apparent_encoding 324 # else: 325 # self.apparent_encoding = None 326 # else: 327 # self.apparent_encoding = None 328 self.apparent_encoding = None 329 330 # if (self.encoding is not None) or (self.apparent_encoding is not None): 331 # self.text = request.text 332 # else: 333 # self.text = None 334 self.text = None 335 336 def getResultEncoding(self): 337 if self.encoding is not None: 338 return self.encoding 339 elif self.apparent_encoding is not None: 340 return self.apparent_encoding 341 else: 342 return 'utf-8' 343 344 def getBytesContent(self): 345 if self.content is not None: 346 return self.content 347 elif self.text is not None: 348 bData = bytes(self.text, self.getResultEncoding()) 349 return bData 350 else: 351 print("Can't get data from the request object on url ", self.url) 352 raise Exception
SerializableHttpResponseFromRequests(request, isBinaryFile=False)
306 def __init__(self, request, isBinaryFile=False): 307 super(SerializableHttpResponseFromRequests, self).__init__() 308 self.isBinaryFile = isBinaryFile 309 self.url = request.url 310 self.ok = request.ok 311 self.reason = request.reason 312 self.status_code = request.status_code 313 self.headers = request.headers 314 self.encoding = request.encoding 315 316 if 'content' in dir(request): 317 self.content = request.content 318 else: 319 self.content = None 320 321 # if not isBinaryFile: 322 # if 'apparent_encoding' in dir(request): 323 # self.apparent_encoding = request.apparent_encoding 324 # else: 325 # self.apparent_encoding = None 326 # else: 327 # self.apparent_encoding = None 328 self.apparent_encoding = None 329 330 # if (self.encoding is not None) or (self.apparent_encoding is not None): 331 # self.text = request.text 332 # else: 333 # self.text = None 334 self.text = None
def
saveRequestedFileToFS(folderName, requestResult):
355def saveRequestedFileToFS(folderName, requestResult): 356 fileName = folderName + os.path.basename(requestResult.url) 357 fileName = unify_folder_separators(fileName) 358 with open(fileName, 'wb') as file: 359 if requestResult.content is not None: 360 file.write(requestResult.content) 361 elif requestResult.text is not None: 362 bData = bytes(requestResult.text, requestResult.getResultEncoding()) 363 file.write(bData) 364 else: 365 ex_text = "Can't save object from {} to the fie. Can't get data to save".format(requestResult.url) 366 print(ex_text) 367 raise Exception(ex_text)
def
getFileModificationDate(fileName):