cengal.file_system.directory_manager.versions.v_0.directory_manager
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18import errno 19from contextlib import contextmanager 20import shutil 21import os 22import os.path 23from pathlib import PurePath 24from enum import Enum 25import hashlib 26from cengal.introspection.inspect import frame 27from typing import Optional, Callable, Set, Tuple, Union, List, Any 28 29""" 30Module Docstring 31Docstrings: http://www.python.org/dev/peps/pep-0257/ 32""" 33 34__author__ = "ButenkoMS <gtalk@butenkoms.space>" 35__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 36__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 37__license__ = "Apache License, Version 2.0" 38__version__ = "4.4.1" 39__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 40__email__ = "gtalk@butenkoms.space" 41# __status__ = "Prototype" 42__status__ = "Development" 43# __status__ = "Production" 44 45 46class FilteringType(Enum): 47 including = 0 # include only files with extensions within the set of extensions 48 excluding = 1 # filter out files with extensions within the set of extensions 49 off = 2 # any file will fit criteria 50 51 52def filtered_file_list(root_dir: str, filtering_type: FilteringType, extentions_set: Optional[Set] = None) -> Tuple: 53 """_summary_ 54 55 Args: 56 root_dir (str): r'C:\dir\path' 57 filtering_type (FilteringType): _description_ 58 extentions_set (Optional[Set], optional): {'.upk', '.txt'}. Defaults to None. 59 60 Raises: 61 Exception: _description_ 62 63 Returns: 64 Tuple: (dirpath, dirnames, new_filenames) 65 """ 66 67 if (FilteringType.off != filtering_type) and (extentions_set is None): 68 raise Exception('extentions_set can\'t be None with this filtering type') 69 70 result = None 71 raw_result = next(os.walk(root_dir)) 72 dirpath = raw_result[0] 73 dirnames = raw_result[1] 74 filenames = raw_result[2] 75 76 new_filenames = list() 77 for filename in filenames: 78 if FilteringType.off == filtering_type: 79 new_filenames.append(filename) 80 elif FilteringType.including == filtering_type: 81 extention = os.path.splitext(filename)[1] 82 if extention in extentions_set: 83 new_filenames.append(filename) 84 elif FilteringType.excluding == filtering_type: 85 extention = os.path.splitext(filename)[1] 86 if extention not in extentions_set: 87 new_filenames.append(filename) 88 89 result = (dirpath, dirnames, new_filenames) 90 return result 91 92 93def filtered_file_list_traversal(root_dir: str, filtering_type: FilteringType, extentions_set: Optional[Set] = None, 94 remove_empty_items: bool =False, use_spinner: bool = False): 95 """ 96 :param root_dir: str(); r'C:\dir\path' 97 :param filtering_type: FilteringType() 98 :param extentions_set: set(); {'.upk', '.txt'} 99 :return: list() of tuple(); list of (dirpath, dirnames, new_filenames) 100 """ 101 102 if (FilteringType.off != filtering_type) and (extentions_set is None): 103 raise Exception('extentions_set can\'t be None with this filtering type') 104 105 spinner = None 106 spinner_index = 0 107 if use_spinner: 108 from progress.spinner import Spinner 109 spinner = Spinner('Loading ') 110 result_list = list() 111 for raw_result in os.walk(root_dir): 112 dirpath = raw_result[0] 113 dirnames = raw_result[1] 114 filenames = raw_result[2] 115 116 new_filenames = list() 117 for filename in filenames: 118 if FilteringType.off == filtering_type: 119 new_filenames.append(filename) 120 elif FilteringType.including == filtering_type: 121 extention = os.path.splitext(filename)[1] 122 if extention in extentions_set: 123 new_filenames.append(filename) 124 elif FilteringType.excluding == filtering_type: 125 extention = os.path.splitext(filename)[1] 126 if extention not in extentions_set: 127 new_filenames.append(filename) 128 if remove_empty_items: 129 if len(new_filenames) > 0: 130 result = (dirpath, dirnames, new_filenames) 131 result_list.append(result) 132 else: 133 result = (dirpath, dirnames, new_filenames) 134 result_list.append(result) 135 136 if use_spinner: 137 spinner_index += 1 138 if spinner_index >= 1000: 139 spinner_index = 0 140 spinner.next() 141 if use_spinner: 142 print() 143 144 return result_list 145 146 147class FilteringEntity(Enum): 148 dirpath = 0 149 dirname = 1 150 filename = 2 151 aggregated = 3 152 153 154def file_list_traversal(root_dir: str, filter: Callable = None, remove_empty_dirpaths=False) -> Union[Any, List[Tuple[str, List[str], List[str]]]]: 155 """ 156 :param root_dir: str(); r'C:\dir\path' 157 :param filtering_type: FilteringType() 158 :param extentions_set: set(); {'.upk', '.txt'} 159 :return: list() of tuple(); list of (dirpath, dirnames, new_filenames) 160 """ 161 162 if filter is None: 163 def default_filter(filtering_entity: FilteringEntity, data): 164 if FilteringEntity.aggregated == filtering_entity: 165 return data 166 else: 167 return True 168 169 filter = default_filter 170 171 result_list = list() 172 for raw_result in os.walk(root_dir): 173 dirpath = raw_result[0] 174 dirnames = raw_result[1] 175 filenames = raw_result[2] 176 177 if not filter(FilteringEntity.dirpath, (dirpath, dirnames, filenames)): 178 continue 179 180 new_dirnames = list() 181 for dirname in dirnames: 182 if filter(FilteringEntity.dirname, (dirpath, dirname)): 183 new_dirnames.append(dirname) 184 185 new_filenames = list() 186 for filename in filenames: 187 if filter(FilteringEntity.filename, (dirpath, filename)): 188 new_filenames.append(filename) 189 190 if remove_empty_dirpaths: 191 if not new_filenames and not new_dirnames: 192 continue 193 194 result = (dirpath, new_dirnames, new_filenames) 195 result_list.append(result) 196 197 return filter(FilteringEntity.aggregated, result_list) 198 199 200def file_list_traversal_ex( 201 root_dir: str, 202 filter: Callable = None, 203 remove_empty_dirpaths=False, 204 subignore_by_dirname=True, 205 subignore_by_dirpath=True, 206 subignore: Optional[Set[str]] = None 207 ) -> Union[Any, List[Tuple[str, List[str], List[str]]]]: 208 """ 209 :param root_dir: str(); r'C:\dir\path' 210 :param filtering_type: FilteringType() 211 :param extentions_set: set(); {'.upk', '.txt'} 212 :return: list() of tuple(); list of (dirpath, dirnames, new_filenames) 213 """ 214 215 if filter is None: 216 def default_filter(filtering_entity: FilteringEntity, data): 217 if FilteringEntity.aggregated == filtering_entity: 218 return data 219 else: 220 return True 221 222 filter = default_filter 223 224 subignore = set() if subignore is None else subignore 225 def is_subignored(dir_path: str) -> bool: 226 if not subignore: 227 return False 228 229 if dir_path in subignore: 230 return True 231 232 while True: 233 new_dir_name = os.path.dirname(dir_path) 234 if new_dir_name == dir_path: 235 break 236 237 if new_dir_name in subignore: 238 return True 239 240 dir_path = new_dir_name 241 242 return False 243 244 result_list = list() 245 for raw_result in os.walk(root_dir): 246 dirpath = raw_result[0] 247 dirnames = raw_result[1] 248 filenames = raw_result[2] 249 250 if is_subignored(dirpath): 251 continue 252 253 if not filter(FilteringEntity.dirpath, (dirpath, dirnames, filenames)): 254 if subignore_by_dirpath: 255 subignore.add(dirpath) 256 257 continue 258 259 new_dirnames = list() 260 for dirname in dirnames: 261 if is_subignored(os.path.join(dirpath, dirname)): 262 continue 263 264 if filter(FilteringEntity.dirname, (dirpath, dirname)): 265 new_dirnames.append(dirname) 266 else: 267 if subignore_by_dirname: 268 subignore.add(os.path.join(dirpath, dirname)) 269 270 new_filenames = list() 271 for filename in filenames: 272 if filter(FilteringEntity.filename, (dirpath, filename)): 273 new_filenames.append(filename) 274 275 if remove_empty_dirpaths: 276 if not new_filenames and not new_dirnames: 277 continue 278 279 result = (dirpath, new_dirnames, new_filenames) 280 result_list.append(result) 281 282 return filter(FilteringEntity.aggregated, result_list) 283 284 285def clear_dir(full_dir_path): 286 full_dir_path = os.path.normpath(full_dir_path) 287 if (os.path.exists(full_dir_path) and os.path.isdir(full_dir_path)): 288 dir_path, sub_dir_names, file_names = filtered_file_list(full_dir_path, FilteringType.off) 289 for sub_dir in sub_dir_names: 290 full_sub_dir_path = os.path.join(dir_path, sub_dir) 291 shutil.rmtree(full_sub_dir_path, ignore_errors=True) 292 for file in file_names: 293 full_file_name = os.path.join(dir_path, file) 294 os.remove(full_file_name) 295 296 297@contextmanager 298def change_current_dir(new_current_dir): 299 new_current_dir = os.path.normpath(new_current_dir) 300 cur_dir = os.getcwd() 301 os.chdir(new_current_dir) 302 try: 303 yield 304 except: 305 raise 306 finally: 307 os.chdir(cur_dir) 308 309 310@contextmanager 311def secure_current_dir(): 312 cur_dir = os.getcwd() 313 try: 314 yield 315 except: 316 raise 317 finally: 318 os.chdir(cur_dir) 319 320 321def get_dir_hash(full_dir_path, hash_format_string=None): 322 hash_format_string = hash_format_string or '{} {}' 323 result_list = filtered_file_list_traversal(full_dir_path, FilteringType.off) 324 325 dir_content = list() 326 for dirpath, dirnames, new_filenames in result_list: 327 for filename in new_filenames: 328 full_file_name = os.path.join(dirpath, filename) 329 with open(full_file_name, 'rb') as file: 330 file_content = file.read() 331 dir_content.append(file_content) 332 dir_content = b''.join(dir_content) 333 dir_hash = hash_format_string.format(hashlib.sha512(dir_content).hexdigest(), hex(len(dir_content))[2:]) 334 return dir_hash 335 336 337def current_src_dir(depth: Optional[int] = 1): 338 """ 339 340 :param depth: 0 - path of this file, 1 - path of the caller's file, etc. 341 :return: 342 """ 343 depth = depth or 0 344 depth += 1 345 return os.path.dirname(os.path.realpath(frame(depth).f_code.co_filename)) 346 347 348def ensure_dir(dir_path): 349 dir_path = os.path.normpath(dir_path) 350 if not (os.path.exists(dir_path) and os.path.isdir(dir_path)): 351 try: 352 os.makedirs(dir_path, exist_ok=True) 353 except OSError as er: 354 # Windows can raise spurious ENOTEMPTY errors. See https://github.com/pypa/pip/issues/6426 355 if (errno.EEXIST != er.errno) and (errno.ENOTEMPTY != er.errno): 356 raise 357 358 359def ensure_empty_dir(dir_path): 360 ensure_dir(dir_path) 361 clear_dir(dir_path) 362 363 364def dir_exists(dir_path) -> bool: 365 dir_path: str = os.path.normpath(dir_path) 366 return os.path.exists(dir_path) and os.path.isdir(dir_path) 367 368 369class DirNotExistsError(Exception): 370 pass 371 372 373def dir_empty(dir_path) -> bool: 374 dir_path: str = os.path.normpath(dir_path) 375 if not dir_exists(dir_path): 376 raise DirNotExistsError 377 378 dir_path, sub_dir_names, file_names = filtered_file_list(dir_path, FilteringType.off) 379 return (not sub_dir_names) and (not file_names) 380 381 382def path_relative_to_current_dir(relative_path: Optional[str]=None) -> str: 383 relative_path = relative_path or str() 384 return os.path.normpath(os.path.join(os.getcwd(), os.path.normpath(relative_path)))
47class FilteringType(Enum): 48 including = 0 # include only files with extensions within the set of extensions 49 excluding = 1 # filter out files with extensions within the set of extensions 50 off = 2 # any file will fit criteria
An enumeration.
Inherited Members
- enum.Enum
- name
- value
53def filtered_file_list(root_dir: str, filtering_type: FilteringType, extentions_set: Optional[Set] = None) -> Tuple: 54 """_summary_ 55 56 Args: 57 root_dir (str): r'C:\dir\path' 58 filtering_type (FilteringType): _description_ 59 extentions_set (Optional[Set], optional): {'.upk', '.txt'}. Defaults to None. 60 61 Raises: 62 Exception: _description_ 63 64 Returns: 65 Tuple: (dirpath, dirnames, new_filenames) 66 """ 67 68 if (FilteringType.off != filtering_type) and (extentions_set is None): 69 raise Exception('extentions_set can\'t be None with this filtering type') 70 71 result = None 72 raw_result = next(os.walk(root_dir)) 73 dirpath = raw_result[0] 74 dirnames = raw_result[1] 75 filenames = raw_result[2] 76 77 new_filenames = list() 78 for filename in filenames: 79 if FilteringType.off == filtering_type: 80 new_filenames.append(filename) 81 elif FilteringType.including == filtering_type: 82 extention = os.path.splitext(filename)[1] 83 if extention in extentions_set: 84 new_filenames.append(filename) 85 elif FilteringType.excluding == filtering_type: 86 extention = os.path.splitext(filename)[1] 87 if extention not in extentions_set: 88 new_filenames.append(filename) 89 90 result = (dirpath, dirnames, new_filenames) 91 return result
_summary_
Args: root_dir (str): r'C:\dir\path' filtering_type (FilteringType): _description_ extentions_set (Optional[Set], optional): {'.upk', '.txt'}. Defaults to None.
Raises: Exception: _description_
Returns: Tuple: (dirpath, dirnames, new_filenames)
94def filtered_file_list_traversal(root_dir: str, filtering_type: FilteringType, extentions_set: Optional[Set] = None, 95 remove_empty_items: bool =False, use_spinner: bool = False): 96 """ 97 :param root_dir: str(); r'C:\dir\path' 98 :param filtering_type: FilteringType() 99 :param extentions_set: set(); {'.upk', '.txt'} 100 :return: list() of tuple(); list of (dirpath, dirnames, new_filenames) 101 """ 102 103 if (FilteringType.off != filtering_type) and (extentions_set is None): 104 raise Exception('extentions_set can\'t be None with this filtering type') 105 106 spinner = None 107 spinner_index = 0 108 if use_spinner: 109 from progress.spinner import Spinner 110 spinner = Spinner('Loading ') 111 result_list = list() 112 for raw_result in os.walk(root_dir): 113 dirpath = raw_result[0] 114 dirnames = raw_result[1] 115 filenames = raw_result[2] 116 117 new_filenames = list() 118 for filename in filenames: 119 if FilteringType.off == filtering_type: 120 new_filenames.append(filename) 121 elif FilteringType.including == filtering_type: 122 extention = os.path.splitext(filename)[1] 123 if extention in extentions_set: 124 new_filenames.append(filename) 125 elif FilteringType.excluding == filtering_type: 126 extention = os.path.splitext(filename)[1] 127 if extention not in extentions_set: 128 new_filenames.append(filename) 129 if remove_empty_items: 130 if len(new_filenames) > 0: 131 result = (dirpath, dirnames, new_filenames) 132 result_list.append(result) 133 else: 134 result = (dirpath, dirnames, new_filenames) 135 result_list.append(result) 136 137 if use_spinner: 138 spinner_index += 1 139 if spinner_index >= 1000: 140 spinner_index = 0 141 spinner.next() 142 if use_spinner: 143 print() 144 145 return result_list
:param root_dir: str(); r'C:\dir\path' :param filtering_type: FilteringType() :param extentions_set: set(); {'.upk', '.txt'} :return: list() of tuple(); list of (dirpath, dirnames, new_filenames)
An enumeration.
Inherited Members
- enum.Enum
- name
- value
155def file_list_traversal(root_dir: str, filter: Callable = None, remove_empty_dirpaths=False) -> Union[Any, List[Tuple[str, List[str], List[str]]]]: 156 """ 157 :param root_dir: str(); r'C:\dir\path' 158 :param filtering_type: FilteringType() 159 :param extentions_set: set(); {'.upk', '.txt'} 160 :return: list() of tuple(); list of (dirpath, dirnames, new_filenames) 161 """ 162 163 if filter is None: 164 def default_filter(filtering_entity: FilteringEntity, data): 165 if FilteringEntity.aggregated == filtering_entity: 166 return data 167 else: 168 return True 169 170 filter = default_filter 171 172 result_list = list() 173 for raw_result in os.walk(root_dir): 174 dirpath = raw_result[0] 175 dirnames = raw_result[1] 176 filenames = raw_result[2] 177 178 if not filter(FilteringEntity.dirpath, (dirpath, dirnames, filenames)): 179 continue 180 181 new_dirnames = list() 182 for dirname in dirnames: 183 if filter(FilteringEntity.dirname, (dirpath, dirname)): 184 new_dirnames.append(dirname) 185 186 new_filenames = list() 187 for filename in filenames: 188 if filter(FilteringEntity.filename, (dirpath, filename)): 189 new_filenames.append(filename) 190 191 if remove_empty_dirpaths: 192 if not new_filenames and not new_dirnames: 193 continue 194 195 result = (dirpath, new_dirnames, new_filenames) 196 result_list.append(result) 197 198 return filter(FilteringEntity.aggregated, result_list)
:param root_dir: str(); r'C:\dir\path' :param filtering_type: FilteringType() :param extentions_set: set(); {'.upk', '.txt'} :return: list() of tuple(); list of (dirpath, dirnames, new_filenames)
201def file_list_traversal_ex( 202 root_dir: str, 203 filter: Callable = None, 204 remove_empty_dirpaths=False, 205 subignore_by_dirname=True, 206 subignore_by_dirpath=True, 207 subignore: Optional[Set[str]] = None 208 ) -> Union[Any, List[Tuple[str, List[str], List[str]]]]: 209 """ 210 :param root_dir: str(); r'C:\dir\path' 211 :param filtering_type: FilteringType() 212 :param extentions_set: set(); {'.upk', '.txt'} 213 :return: list() of tuple(); list of (dirpath, dirnames, new_filenames) 214 """ 215 216 if filter is None: 217 def default_filter(filtering_entity: FilteringEntity, data): 218 if FilteringEntity.aggregated == filtering_entity: 219 return data 220 else: 221 return True 222 223 filter = default_filter 224 225 subignore = set() if subignore is None else subignore 226 def is_subignored(dir_path: str) -> bool: 227 if not subignore: 228 return False 229 230 if dir_path in subignore: 231 return True 232 233 while True: 234 new_dir_name = os.path.dirname(dir_path) 235 if new_dir_name == dir_path: 236 break 237 238 if new_dir_name in subignore: 239 return True 240 241 dir_path = new_dir_name 242 243 return False 244 245 result_list = list() 246 for raw_result in os.walk(root_dir): 247 dirpath = raw_result[0] 248 dirnames = raw_result[1] 249 filenames = raw_result[2] 250 251 if is_subignored(dirpath): 252 continue 253 254 if not filter(FilteringEntity.dirpath, (dirpath, dirnames, filenames)): 255 if subignore_by_dirpath: 256 subignore.add(dirpath) 257 258 continue 259 260 new_dirnames = list() 261 for dirname in dirnames: 262 if is_subignored(os.path.join(dirpath, dirname)): 263 continue 264 265 if filter(FilteringEntity.dirname, (dirpath, dirname)): 266 new_dirnames.append(dirname) 267 else: 268 if subignore_by_dirname: 269 subignore.add(os.path.join(dirpath, dirname)) 270 271 new_filenames = list() 272 for filename in filenames: 273 if filter(FilteringEntity.filename, (dirpath, filename)): 274 new_filenames.append(filename) 275 276 if remove_empty_dirpaths: 277 if not new_filenames and not new_dirnames: 278 continue 279 280 result = (dirpath, new_dirnames, new_filenames) 281 result_list.append(result) 282 283 return filter(FilteringEntity.aggregated, result_list)
:param root_dir: str(); r'C:\dir\path' :param filtering_type: FilteringType() :param extentions_set: set(); {'.upk', '.txt'} :return: list() of tuple(); list of (dirpath, dirnames, new_filenames)
286def clear_dir(full_dir_path): 287 full_dir_path = os.path.normpath(full_dir_path) 288 if (os.path.exists(full_dir_path) and os.path.isdir(full_dir_path)): 289 dir_path, sub_dir_names, file_names = filtered_file_list(full_dir_path, FilteringType.off) 290 for sub_dir in sub_dir_names: 291 full_sub_dir_path = os.path.join(dir_path, sub_dir) 292 shutil.rmtree(full_sub_dir_path, ignore_errors=True) 293 for file in file_names: 294 full_file_name = os.path.join(dir_path, file) 295 os.remove(full_file_name)
322def get_dir_hash(full_dir_path, hash_format_string=None): 323 hash_format_string = hash_format_string or '{} {}' 324 result_list = filtered_file_list_traversal(full_dir_path, FilteringType.off) 325 326 dir_content = list() 327 for dirpath, dirnames, new_filenames in result_list: 328 for filename in new_filenames: 329 full_file_name = os.path.join(dirpath, filename) 330 with open(full_file_name, 'rb') as file: 331 file_content = file.read() 332 dir_content.append(file_content) 333 dir_content = b''.join(dir_content) 334 dir_hash = hash_format_string.format(hashlib.sha512(dir_content).hexdigest(), hex(len(dir_content))[2:]) 335 return dir_hash
338def current_src_dir(depth: Optional[int] = 1): 339 """ 340 341 :param depth: 0 - path of this file, 1 - path of the caller's file, etc. 342 :return: 343 """ 344 depth = depth or 0 345 depth += 1 346 return os.path.dirname(os.path.realpath(frame(depth).f_code.co_filename))
:param depth: 0 - path of this file, 1 - path of the caller's file, etc. :return:
349def ensure_dir(dir_path): 350 dir_path = os.path.normpath(dir_path) 351 if not (os.path.exists(dir_path) and os.path.isdir(dir_path)): 352 try: 353 os.makedirs(dir_path, exist_ok=True) 354 except OSError as er: 355 # Windows can raise spurious ENOTEMPTY errors. See https://github.com/pypa/pip/issues/6426 356 if (errno.EEXIST != er.errno) and (errno.ENOTEMPTY != er.errno): 357 raise
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args