cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19__all__ = ['Db', 'DbRequest', 'KeyType', 'RawKeyType', 'ValueType', 'RawValueType', 'DbId', 'DbName', 'DbKeyError'] 20 21from cengal.parallel_execution.coroutines.coro_scheduler import * 22from cengal.parallel_execution.coroutines.coro_tools.await_coro import * 23from cengal.parallel_execution.coroutines.coro_standard_services.asyncio_loop import * 24from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import CoroPriority 25from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import * 26from cengal.parallel_execution.coroutines.coro_standard_services.timer_func_runner import * 27from cengal.parallel_execution.coroutines.coro_standard_services.instance import * 28from cengal.file_system.file_manager import path_relative_to_current_dir 29from cengal.time_management.cpu_clock_cycles import perf_counter 30from cengal.data_manipulation.serialization import * 31from cengal.introspection.inspect import get_exception 32from cengal.file_system.app_fs_structure.app_dir_path import AppDirPath, AppDirectoryType 33from cengal.file_system.path_manager import RelativePath 34from typing import Hashable, Tuple, List, Any, Dict, Callable, Sequence 35import sys 36import os 37import asyncio 38try: 39 import lmdb 40except ImportError: 41 from warnings import warn 42 warn('''WARNING: `lmdb` library is not installed. Db service will not work. 43 To install `lmdb` use: `pip install lmdb`''') 44 raise 45 46from os.path import normpath 47from uuid import uuid4 48 49 50""" 51Module Docstring 52Docstrings: http://www.python.org/dev/peps/pep-0257/ 53""" 54 55__author__ = "ButenkoMS <gtalk@butenkoms.space>" 56__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 57__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 58__license__ = "Apache License, Version 2.0" 59__version__ = "4.4.1" 60__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 61__email__ = "gtalk@butenkoms.space" 62# __status__ = "Prototype" 63__status__ = "Development" 64# __status__ = "Production" 65 66 67SingleKeyType = Union[bytes, str] 68CompoundKeyType = Union[List[SingleKeyType], Set[SingleKeyType], Tuple[SingleKeyType], List['CompoundKeyType'], Set['CompoundKeyType'], Tuple['CompoundKeyType']] 69KeyType = Union[SingleKeyType, CompoundKeyType] 70NormalizedCompoundKeyType = Union[Tuple[SingleKeyType], Tuple['NormalizedCompoundKeyType']] 71NormalizedKeyType = Union[SingleKeyType, NormalizedCompoundKeyType] 72RawKeyType = bytes # By default record keys are limited to 511 bytes in length, however this can be adjusted by rebuilding the library. The compile-time key length can be queried via Environment.max_key_size() 73ValueType = Any 74RawValueType = bytes 75DbId = Hashable 76DbName = bytes 77EnvId = Hashable 78 79 80class KeyInfo: 81 def __init__(self, key: KeyType, db_id: DbId = None, env_id: EnvId = None): 82 self.key: KeyType = key 83 self.db_id: DbId = db_id 84 self.env_id: EnvId = env_id 85 86 87class EnvInitInfo: 88 def __init__(self, path_to_db_environment: str, *args, **kwargs): 89 self.path_to_default_db_environment: str = normpath(path_to_db_environment) 90 self.args: Tuple = (self.path_to_default_db_environment,) + args 91 self.kwargs: Dict = kwargs 92 93 94class EnvInfo: 95 db_name_prefix = '__db_name_key__' 96 97 def __init__(self, init_info: EnvInitInfo, env: lmdb.Environment): 98 self.init_info: EnvInitInfo = init_info 99 self.env: lmdb.Environment = env 100 self.env_id: EnvId = init_info.path_to_default_db_environment 101 self.databases: Dict[Hashable, lmdb._Database] = dict() 102 self.db_names: Dict[DbId, DbName] = dict() 103 104 @staticmethod 105 def gen_db_name_from_db_id(db_id: DbId) -> bytes: 106 return f'{EnvInfo.db_name_prefix}{db_id}'.encode('utf-8') 107 108 def db_name_by_db_id(self, db_id: DbId) -> bytes: 109 db_name: bytes = EnvInfo.gen_db_name_from_db_id(db_id) 110 try: 111 return self.db_names[db_name] 112 except KeyError: 113 raise UnknownEnvDBError(self.env_id, db_id) 114 115 def db_by_db_id(self, db_id: DbId) -> lmdb._Database: 116 try: 117 return self.databases[db_id] 118 except KeyError: 119 raise UnknownEnvDBError(self.env_id, db_id) 120 121 122class DbKeyError(KeyError): 123 def __init__(self, key_info: Tuple[KeyType, DbId], *args: object) -> None: 124 super().__init__(*args) 125 self.key_info: Tuple[KeyType, DbId] = key_info 126 127 128class DBError(Exception): 129 def __init__(self, db_id: DbId, original_exception: Exception, *args): 130 super().__init__(*args) 131 self.db_id: DbId = db_id 132 self.original_exception = original_exception 133 134 @staticmethod 135 def from_exception(db_id: DbId) -> 'DBError': 136 return DBError(db_id, get_exception()) 137 138 139class UnknownEnvError(Exception): 140 pass 141 142 143class UnknownEnvDBError(Exception): 144 pass 145 146 147class WrongKeyTypeError(TypeError): 148 pass 149 150 151class RawKeyIsTooLargeError(ValueError): 152 pass 153 154 155class NormalizedCompoundKey: 156 def __init__(self, normalized_key: NormalizedKeyType) -> None: 157 self.normalized_key: NormalizedKeyType = normalized_key 158 159 def __call__(self) -> Any: 160 return self.normalized_key 161 162 @staticmethod 163 def from_key(key: KeyType): 164 return NormalizedCompoundKey(normalize_compound_key(key)) 165 166 167NCK = NormalizedCompoundKey 168InputKeyType = Union[NormalizedCompoundKey, KeyType] 169 170 171def is_normalized_compound_key(key: KeyType) -> bool: 172 if isinstance(key, (bytes, str, int, float)): 173 return True 174 elif isinstance(key, tuple): 175 return all(is_normalized_compound_key(item) for item in key) 176 else: 177 return False 178 179 180def normalize_compound_key(key: KeyType) -> NormalizedKeyType: 181 if isinstance(key, NormalizedCompoundKey): 182 return key() 183 elif is_normalized_compound_key(key): 184 return key 185 elif isinstance(key, list): 186 need_to_sort: bool = False 187 elif isinstance(key, (set, frozenset)): 188 need_to_sort = True 189 elif isinstance(key, dict): 190 key = tuple(key.items()) 191 need_to_sort = True 192 else: 193 raise WrongKeyTypeError(f'Wrong key type: {type(key)}: {key}') 194 195 new_key = list() 196 for item in key: 197 new_key.append(normalize_compound_key(item)) 198 199 if need_to_sort: 200 new_key.sort() 201 202 key = tuple(new_key) 203 204 return key 205 206 207class DbRequest(ServiceRequest): 208 def __init__(self, env_id: EnvId = None, db_id: DbId = None): 209 super().__init__() 210 self.env_id: EnvId = env_id 211 self.db_id: DbId = db_id 212 self.provide_to_request_handler = True 213 214 def _copy(self) -> 'DbRequest': 215 return DbRequest(self.env_id, self.db_id) 216 217 def _get_db_id(self, db_id: DbId) -> DbId: 218 if self.db_id is None: 219 return db_id 220 else: 221 return self.db_id 222 223 def set_default_db_environment_path(self, path_to_db_environment: str) -> bool: 224 return self._save_to_copy(0, path_to_db_environment) 225 226 def open_databases(self, db_names: Dict[DbId, DbName]) -> None: 227 return self._save_to_copy(1, db_names) 228 229 def drop_db(self, db_id: DbId, delete: bool = False) -> None: 230 return self._save_to_copy(2, db_id, delete) 231 232 def sync(self) -> None: 233 return self._save_to_copy(3) 234 235 def get(self, key: InputKeyType, db_id: DbId = None) -> ValueType: 236 return self._save_to_copy(4, key, db_id) 237 238 def get_first(self, db_id: DbId = None) -> Dict[NormalizedKeyType, ValueType]: 239 # Returns first item in DB 240 return self._save_to_copy(14, db_id) 241 242 def get_last(self, db_id: DbId = None) -> Dict[NormalizedKeyType, ValueType]: 243 # Returns last item in DB 244 return self._save_to_copy(15, db_id) 245 246 def get_items(self, db_keys: Sequence[InputKeyType], db_id: DbId = None) -> List[Tuple[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]]: 247 return self._save_to_copy(5, db_keys, db_id) 248 249 def get_n_items(self, desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]: 250 return self._save_to_copy(16, desired_key, num, db_id, reverse=False) 251 252 def get_reverse_n_items(self, desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]: 253 return self._save_to_copy(16, desired_key, num, db_id, reverse=True) 254 255 def get_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]: 256 return self._save_to_copy(17, first_desired_key, last_desired_key, num, db_id, reverse=False) 257 258 def get_reverse_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]: 259 return self._save_to_copy(17, first_desired_key, last_desired_key, num, db_id, reverse=True) 260 261 def get_all_items(self, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]: 262 # Returns all items from DB 263 return self._save_to_copy(6, db_id) 264 265 def put(self, key: InputKeyType, value: Optional[ValueType] = None, db_id: DbId = None) -> RawValueType: 266 return self._save_to_copy(7, key, value, db_id) 267 268 def put_items(self, db_items: Dict[InputKeyType, ValueType], db_id: DbId = None) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]: 269 return self._save_to_copy(8, db_items, db_id) 270 271 def delete(self, key: InputKeyType, db_id: DbId = None) -> RawValueType: 272 return self._save_to_copy(9, key, db_id) 273 274 def delete_kv(self, key: InputKeyType, value: ValueType, db_id: DbId = None) -> RawValueType: 275 return self._save_to_copy(10, key, value, db_id) 276 277 def delete_items(self, db_items: Set[InputKeyType], db_id: DbId = None) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]: 278 return self._save_to_copy(11, db_items, db_id) 279 280 def delete_kv_items(self, db_items: Dict[InputKeyType, Tuple[ValueType]], db_id: DbId = None) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]: 281 return self._save_to_copy(12, db_items, db_id) 282 283 def open_db_environment(self, path_to_db_environment: str) -> EnvId: 284 return self._save_to_copy(13, path_to_db_environment) 285 286 def lock_databases(self, db_names: Optional[Set[DbId]] = None) -> None: 287 # Lock all databases if db_names is None. Databases will be released automatically wnen coroutine execution will be finished 288 return self._save_to_copy(18, db_names) 289 290 def try_lock_databases(self, db_names: Optional[Set[DbId]] = None) -> bool: 291 # Tries to lock all databases if db_names is None. Returns True if try was successfull. False otherwise. Databases will be released automatically wnen coroutine execution will be finished 292 return self._save_to_copy(19, db_names) 293 294 def unlock_databases(self, db_names: Optional[Set[DbId]] = None) -> None: 295 # Unlock all databases if db_names is None 296 return self._save_to_copy(18, db_names) 297 298 299class Db(Service, EntityStatsMixin): 300 def __init__(self, loop: CoroSchedulerType): 301 super(Db, self).__init__(loop) 302 self.default_db_id: DbName = b'__default__' 303 self.default_env_name: str = '__default__.dbenv' 304 # self.drop_db_requests: Dict[CoroID, Tuple[Hashable, bool]] = dict() 305 # self.drop_db_requests: Dict[EnvId, Dict[CoroID, Tuple[DbId, bool]]] = dict() 306 self.drop_db_requests: Dict[EnvId, Dict[DbId, List[bool, Set[CoroID]]]] = dict() 307 # self.read_queue: List[Tuple[CoroID, Tuple[RawKeyType, DbId]]] = list() 308 self.read_queue: Dict[EnvId, Dict[DbId, Dict[RawKeyType, Set[CoroID]]]] = dict() 309 self.massive_read_queue: Dict[EnvId, Dict[CoroID, Dict[DbId, Set[RawKeyType]]]] = dict() 310 # self.massive_read_queue: List[Tuple[CoroID, Set[Tuple[KeyType, DbId]]]] = list() 311 self.data_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, RawValueType]]] = dict() 312 # self.data_cache: Dict[EnvId, Dict[Tuple[RawKeyType, DbId], RawValueType]] = dict() 313 self.deletion_cache: Dict[EnvId, Dict[DbId, Set[RawKeyType]]] = dict() 314 self.kv_deletion_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, RawValueType]]] = dict() 315 # self.kv_deletion_cache: Dict[Tuple[Hashable, Hashable], Any] = dict() 316 self.get_first_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict() 317 self.get_last_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict() 318 self.get_n_items_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, int, bool]]] = dict() 319 self.get_items_range_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, RawKeyType, int, bool]]] = dict() 320 self.get_all_items_queue: List[Tuple[CoroID, DbId, EnvId]] = list() 321 self.path_to_default_db_environment: str = None 322 self.app_name_waiter: CoroWrapperBase = None 323 self.default_db_environment: lmdb.Environment = None 324 self.db_environments: Dict[EnvId, EnvInfo] = dict() 325 # self.databases: Dict[Hashable, Any] = dict() 326 # self.db_names: Dict[DbId, DbName] = dict() 327 self.async_loop = None 328 self.sync_time_interval = 1.0 329 self.last_sync_time = perf_counter() 330 self.force_sync: Set[EnvId] = set() 331 self.write_locked: Set[EnvId] = set() 332 self.writes_num: int = 0 333 self.reads_num: int = 0 334 self.deletes_num: int = 0 335 self.db_drops_num: int = 0 336 self.write_locked_coro_id: Set[CoroID] = set() 337 # self.serializer = best_serializer_for_standard_data((DataFormats.binary, 338 # Tags.can_use_bytes, 339 # Tags.decode_str_as_str, 340 # Tags.decode_list_as_list, 341 # Tags.decode_bytes_as_bytes, 342 # Tags.superficial, 343 # Tags.current_platform, 344 # Tags.multi_platform), 345 # TestDataType.small, 346 # 0.1) 347 self.serializer = Serializer(Serializers.msgspec_messagepack) 348 349 self._request_workers = { 350 0: self._on_set_default_db_environment_path, 351 1: self._on_open_databases, 352 2: self._on_drop_db, 353 3: self._on_sync, 354 4: self._on_get, 355 5: self._on_get_items, 356 6: self._on_get_all_items, 357 7: self._on_put, 358 8: self._on_put_items, 359 9: self._on_delete, 360 10: self._on_delete_kv, 361 11: self._on_delete_items, 362 12: self._on_delete_kv_items, 363 13: self._on_open_db_environment, 364 14: self._on_get_first, 365 15: self._on_get_last, 366 16: self._on_get_n_items, 367 17: self._on_get_items_range, 368 } 369 370 # TODO: sync with last implementation 371 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 372 return type(self).__name__, { 373 'db_names': list(self.db_names.keys()), 374 'writes num': self.writes_num, 375 'reads num': self.reads_num, 376 'deletes num': self.deletes_num, 377 'db drops num': self.db_drops_num, 378 } 379 380 def norm_key(self, key: InputKeyType) -> NormalizedKeyType: 381 return normalize_compound_key(key) 382 383 def raw_key(self, env_or_id: Union[lmdb.Environment, EnvId], key: InputKeyType) -> RawKeyType: 384 raw_key: bytes = self.serializer.dumps(normalize_compound_key(key)) 385 if isinstance(env_or_id, lmdb.Environment): 386 env = env_or_id 387 else: 388 env = self.db_environments[env_or_id].env 389 390 if len(raw_key) > env.max_key_size(): 391 raise RawKeyIsTooLargeError(f'Raw form ({raw_key=}) of the key ({key=}) is too large: {len(raw_key)} > {env.max_key_size()}') 392 393 def single_task_registration_or_immediate_processing( 394 self, *args, **kwargs) -> ServiceProcessingResponse: 395 result = self.try_resolve_request(*args, **kwargs) 396 if result is None: 397 return True, None, None 398 else: 399 return result 400 401 def full_processing_iteration(self): 402 if self.default_db_environment is None: 403 if self.path_to_default_db_environment is None: 404 if self.app_name_waiter is None: 405 async def coro(i: Interface, self: 'Db'): 406 app_name_for_fs = await i(InstanceRequest().wait('app_name_for_fs')) 407 app_data_dir_path_type: AppDirectoryType = await i(InstanceRequest().wait('app_data_dir_path_type')) 408 app_dir_path: AppDirPath = await i(InstanceRequest().wait(AppDirPath)) 409 app_data_dir_path: str = app_dir_path.cached(app_data_dir_path_type, app_name_for_fs) 410 self.path_to_default_db_environment = RelativePath(app_data_dir_path)(self.default_env_name) 411 self.app_name_waiter = None 412 self.make_live() 413 414 self.app_name_waiter = put_root_from_other_service(self, coro, self) 415 self.app_name_waiter.is_background_coro = True 416 417 self.make_dead() 418 return 419 else: 420 self._init_default_db() 421 422 if self.force_sync: 423 envs_need_to_be_sync: Set[DbId] = self.force_sync 424 self.force_sync = set() 425 else: 426 envs_need_to_be_sync = set() 427 428 data_cache_buff = self.data_cache 429 self.data_cache = type(data_cache_buff)() 430 431 read_queue_buff = self.read_queue 432 self.read_queue = type(read_queue_buff)() 433 434 massive_read_queue_buff = self.massive_read_queue 435 self.massive_read_queue = type(massive_read_queue_buff)() 436 437 kv_deletion_cache_buff = self.kv_deletion_cache 438 self.kv_deletion_cache = type(kv_deletion_cache_buff)() 439 440 get_all_items_queue_buff = self.get_all_items_queue 441 self.get_all_items_queue = type(get_all_items_queue_buff)() 442 443 get_first_queue_buff = self.get_first_queue 444 self.get_first_queue = type(get_first_queue_buff)() 445 446 get_last_queue_buff = self.get_last_queue 447 self.get_last_queue = type(get_last_queue_buff)() 448 449 get_n_items_queue_buff = self.get_n_items_queue 450 self.get_n_items_queue = type(get_n_items_queue_buff)() 451 452 get_items_range_queue_buff = self.get_items_range_queue 453 self.get_items_range_queue = type(get_items_range_queue_buff)() 454 455 # put 456 def put_handler(env_info: EnvInfo, put_info: Dict[DbId, Dict[RawKeyType, RawValueType]]): 457 try: 458 with env_info.env.begin(write=True) as txn: 459 for db_id, db_put_info in put_info.items(): 460 if db_id in env_info.databases: 461 for raw_key, value in db_put_info.items(): 462 txn.put(raw_key, value, db=env_info.databases[db_id], dupdata=False, append=False) 463 464 self.writes_num += len(db_put_info) 465 except lmdb.MapFullError: 466 raise DBError.from_exception(db_id) 467 468 for env_id, put_info in data_cache_buff.items(): 469 if env_id in self.db_environments: 470 envs_need_to_be_sync.add(env_id) 471 lmdb_reapplier(self.db_environments[env_id], put_handler, put_info) 472 473 # TODO: implement delete* methods processing 474 # delete 475 for env_id, kv_deletion_cache_buff_db_info in kv_deletion_cache_buff.items(): 476 if env_id in self.db_environments: 477 envs_need_to_be_sync.add(env_id) 478 ... 479 480 for key_info, value in kv_deletion_cache_buff.items(): 481 with self.default_db_environment.begin(write=True) as txn: 482 key, db_id = key_info 483 txn.delete(key, value, db=self.databases[db_id]) 484 self.deletes_num += 1 485 486 # drop 487 drop_db_requests_buff = self.drop_db_requests 488 self.drop_db_requests = type(drop_db_requests_buff)() 489 490 def drop_handler(env_info: EnvInfo, drop_info: Dict[DbId, List[bool, Set[CoroID]]]): 491 for db_id, db_drop_info in drop_info.items(): 492 delete_db, coro_id = db_drop_info 493 if db_id in env_info.databases: 494 try: 495 with env_info.env.begin(write=True) as txn: 496 txn.drop(db=env_info.databases[db_id], delete=delete_db) 497 if delete_db: 498 del env_info.databases[db_id] 499 del env_info.db_names[db_id] 500 501 self.db_drops_num += 1 502 except lmdb.MapFullError: 503 raise DBError.from_exception(db_id) 504 505 self.register_response(coro_id, None, UnknownEnvError(env_id)) 506 else: 507 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 508 509 for env_id, drop_info in drop_db_requests_buff.items(): 510 if env_id in self.db_environments: 511 envs_need_to_be_sync.add(env_id) 512 lmdb_reapplier(self.db_environments[env_id], drop_handler, drop_info) 513 else: 514 for db_id, db_drop_info in drop_info.items(): 515 delete_db, coro_id = db_drop_info 516 self.register_response(coro_id, None, UnknownEnvError(env_id)) 517 518 # get 519 def get_item(txn, key_info: Tuple[RawKeyType, DbId, EnvId], data_cache_buff: Dict[EnvId, Dict[DbId, Dict[RawKeyType, RawValueType]]]) -> Tuple[ValueType, Optional[Exception]]: 520 key, db_id, env_id = key_info 521 need_to_get_from_db = True 522 try: 523 value = data_cache_buff[env_id][db_id][key] 524 need_to_get_from_db = False 525 except KeyError: 526 pass 527 528 if need_to_get_from_db: 529 value = txn.get(key, db=self.db_environments[env_id].databases[db_id]) 530 self.reads_num += 1 531 532 exception = None 533 try: 534 if value is None: 535 exception = DbKeyError(key_info) 536 else: 537 value = self.serializer.loads(value) 538 except: 539 exception = get_exception() 540 541 return value, exception 542 543 # _on_get 544 for env_id, read_queue_buff_db_info in read_queue_buff.items(): 545 if env_id in self.db_environments: 546 env_info = self.db_environments[env_id] 547 for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items(): 548 if db_id in env_info.databases: 549 with env_info.env.begin() as txn: 550 for raw_key, coro_ids in read_queue_buff_db_key_info.items(): 551 value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff) 552 for coro_id in coro_ids: 553 self.register_response(coro_id, value, exception) 554 else: 555 for raw_key, coro_ids in read_queue_buff_db_key_info.items(): 556 for coro_id in coro_ids: 557 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 558 else: 559 for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items(): 560 for raw_key, coro_ids in read_queue_buff_db_key_info.items(): 561 for coro_id in coro_ids: 562 self.register_response(coro_id, None, UnknownEnvError(env_id)) 563 564 # _on_get_items 565 results: Dict[CoroID, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]]] = dict() 566 for env_id, massive_read_queue_buff_coro_info in massive_read_queue_buff.items(): 567 if env_id in self.db_environments: 568 env_info = self.db_environments[env_id] 569 for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items(): 570 if coro_id not in results: 571 results[coro_id] = dict() 572 573 coro_results = results[coro_id] 574 for db_id, raw_keys in read_queue_buff_db_info.items(): 575 if db_id not in coro_results: 576 coro_results[db_id] = dict() 577 578 coro_db_results = coro_results[db_id] 579 if db_id in env_info.databases: 580 with env_info.env.begin() as txn: 581 for raw_key in raw_keys: 582 value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff) 583 coro_db_results[normalize_compound_key(raw_key)] = (value, exception) 584 else: 585 for coro_id in coro_ids: 586 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 587 else: 588 for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items(): 589 for db_id, raw_keys in read_queue_buff_db_info.items(): 590 for coro_id in coro_ids: 591 self.register_response(coro_id, None, UnknownEnvError(env_id)) 592 593 for coro_id, coro_results in results.items(): 594 self.register_response(coro_id, coro_results, None) 595 596 # get all items 597 for coro_id, db_id, env_id in get_all_items_queue_buff: 598 if env_id in self.db_environments: 599 env_info = self.db_environments[env_id] 600 env = env_info.env 601 if db_id in env_info.databases: 602 db = env_info.databases[db_id] 603 with env.begin(db=db) as txn: 604 result = list() 605 exception = None 606 try: 607 result = [(normalize_compound_key(self.serializer.loads(k)), self.serializer.loads(v)) for k, v in txn.cursor()] 608 self.reads_num += len(result) 609 except: 610 exception = get_exception() 611 612 self.register_response(coro_id, result, exception) 613 else: 614 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 615 else: 616 self.register_response(coro_id, None, UnknownEnvError(env_id)) 617 618 # get_first 619 for env_id, get_first_queue_buff_db_info in get_first_queue_buff.items(): 620 if env_id in self.db_environments: 621 env_info = self.db_environments[env_id] 622 for db_id, coro_ids in get_first_queue_buff_db_info.items(): 623 if db_id in env_info.databases: 624 db = env_info.databases[db_id] 625 with env_info.env.begin(db=db) as txn: 626 result = None 627 exception = None 628 try: 629 cursor: lmdb.Cursor = txn.cursor() 630 if cursor.first(): 631 result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value())) 632 self.reads_num += 1 633 else: 634 exception = KeyError() 635 except: 636 exception = get_exception() 637 638 for coro_id in coro_ids: 639 self.register_response(coro_id, result, exception) 640 else: 641 for coro_id in coro_ids: 642 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 643 else: 644 for db_id, coro_ids in get_first_queue_buff_db_info.items(): 645 for coro_id in coro_ids: 646 self.register_response(coro_id, None, UnknownEnvError(env_id)) 647 648 # get_last 649 for env_id, get_last_queue_buff_db_info in get_last_queue_buff.items(): 650 if env_id in self.db_environments: 651 env_info = self.db_environments[env_id] 652 for db_id, coro_ids in get_last_queue_buff_db_info.items(): 653 if db_id in env_info.databases: 654 db = env_info.databases[db_id] 655 with env_info.env.begin(db=db) as txn: 656 result = None 657 exception = None 658 try: 659 cursor: lmdb.Cursor = txn.cursor() 660 if cursor.first(): 661 result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value())) 662 self.reads_num += 1 663 else: 664 exception = KeyError() 665 except: 666 exception = get_exception() 667 668 for coro_id in coro_ids: 669 self.register_response(coro_id, result, exception) 670 else: 671 for coro_id in coro_ids: 672 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 673 else: 674 for db_id, coro_ids in get_last_queue_buff_db_info.items(): 675 for coro_id in coro_ids: 676 self.register_response(coro_id, None, UnknownEnvError(env_id)) 677 678 # get_n_items 679 for env_id, get_n_items_queue_buff_coro_info in get_n_items_queue_buff.items(): 680 if env_id in self.db_environments: 681 env_info = self.db_environments[env_id] 682 for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items(): 683 coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list() 684 db_id, first_desired_raw_key, num_items, reverse = read_queue_buff_db_info 685 if db_id in env_info.databases: 686 db = env_info.databases[db_id] 687 with env_info.env.begin(db=db) as txn: 688 result = None 689 exception = None 690 try: 691 cursor: lmdb.Cursor = txn.cursor() 692 if cursor.set_range(first_desired_raw_key): 693 if reverse: 694 cursor_iterator = cursor.iterprev(keys=True, values=True) 695 else: 696 cursor_iterator = cursor.iternext(keys=True, values=True) 697 698 for raw_key, raw_value in cursor_iterator: 699 if (num_items is not None) and (num_items <= 0): 700 break 701 702 coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value))) 703 self.reads_num += 1 704 if num_items is not None: 705 num_items -= 1 706 else: 707 exception = KeyError() 708 except: 709 exception = get_exception() 710 711 self.register_response(coro_id, result, exception) 712 else: 713 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 714 else: 715 for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items(): 716 for coro_id in coro_ids: 717 self.register_response(coro_id, None, UnknownEnvError(env_id)) 718 719 # get_items_range 720 for env_id, get_items_range_queue_buff_coro_info in get_items_range_queue_buff.items(): 721 if env_id in self.db_environments: 722 env_info = self.db_environments[env_id] 723 for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items(): 724 coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list() 725 db_id, first_desired_raw_key, last_desired_raw_key, num_items, reverse = read_queue_buff_db_info 726 if db_id in env_info.databases: 727 db = env_info.databases[db_id] 728 with env_info.env.begin(db=db) as txn: 729 result = None 730 exception = None 731 try: 732 cursor: lmdb.Cursor = txn.cursor() 733 if cursor.set_range(first_desired_raw_key): 734 if reverse: 735 cursor_iterator = cursor.iterprev(keys=True, values=True) 736 else: 737 cursor_iterator = cursor.iternext(keys=True, values=True) 738 739 for raw_key, raw_value in cursor_iterator: 740 if reverse: 741 if raw_key < last_desired_raw_key: 742 break 743 else: 744 if raw_key > last_desired_raw_key: 745 break 746 747 if (num_items is not None) and (num_items <= 0): 748 break 749 750 coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value))) 751 self.reads_num += 1 752 if num_items is not None: 753 num_items -= 1 754 else: 755 exception = KeyError() 756 except: 757 exception = get_exception() 758 759 self.register_response(coro_id, result, exception) 760 else: 761 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 762 else: 763 for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items(): 764 for coro_id in coro_ids: 765 self.register_response(coro_id, None, UnknownEnvError(env_id)) 766 767 # sync 768 if envs_need_to_be_sync: 769 for env_id in envs_need_to_be_sync: 770 self.sync_in_thread_pool(env_id) 771 772 self.last_sync_time = perf_counter() 773 774 self.make_dead() 775 776 def in_work(self) -> bool: 777 result: bool = (bool(self.get_first_queue) or bool(self.get_last_queue) or bool(self.get_n_items_queue) or bool(self.get_items_range_queue) or bool(self.read_queue) or bool(self.massive_read_queue) or bool(self.get_all_items_queue)) or (bool(self.force_sync) or ((not bool(self.write_locked)) and (bool(self.data_cache) or bool(self.kv_deletion_cache) or bool(self.drop_db_requests)) and ((perf_counter() - self.last_sync_time) >= self.sync_time_interval))) 778 return self.thrifty_in_work(result) 779 780 def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]: 781 time_since_last_sync_time: float = perf_counter() - self.last_sync_time 782 if self.sync_time_interval > time_since_last_sync_time: 783 return True, self.sync_time_interval - time_since_last_sync_time 784 else: 785 return True, 0 786 787 def _init_db(self, db_id: DbId): 788 # print(f'{self.path_to_default_db_environment=}') 789 if db_id in self.db_environments: 790 return self.db_environments[db_id] 791 792 env_init_info: EnvInitInfo = EnvInitInfo( 793 db_id, 794 map_size=20 * 1024**2, 795 writemap=True, 796 max_dbs=10, 797 map_async=True, 798 lock=False, 799 metasync=False, 800 sync=False, 801 meminit=False, 802 ) 803 env_info: EnvInfo = EnvInfo(env_init_info, lmdb.open(*env_init_info.args, **env_init_info.kwargs)) 804 db_environment = env_info.env 805 self.db_environments[env_info.env_id] = env_info 806 env_info.databases[self.default_db_id] = env_info.databases[None] = db_environment.open_db(self.default_db_id) 807 db_environment.sync(True) 808 809 def _init_default_db(self): 810 env_info: EnvInfo = self._init_db(self.path_to_default_db_environment) 811 self.db_environments[None] = env_info 812 813 def _on_open_db_environment(self, request: DbRequest, path_to_db_environment: str) -> ServiceProcessingResponse: 814 result: EnvInfo = None 815 try: 816 result = self._init_db(path_to_db_environment) 817 except: 818 exception = get_exception() 819 return True, result, exception 820 821 return True, result.env_id, None 822 823 def _on_set_default_db_environment_path(self, request: DbRequest, path_to_db_environment: str) -> ServiceProcessingResponse: 824 if {None, path_to_db_environment} & self.write_locked: 825 return True, False, None 826 827 if self.default_db_environment is None: 828 self.path_to_default_db_environment = path_to_db_environment 829 try: 830 self._init_default_db() 831 except: 832 exception = get_exception() 833 return True, False, exception 834 return True, True, None 835 else: 836 return True, False, None 837 838 def _on_sync(self, request: DbRequest) -> ServiceProcessingResponse: 839 if self.data_cache: 840 self.force_sync.add(request.env_id) 841 self.make_live() 842 else: 843 # self.default_db_environment.sync(True) 844 self.sync_in_thread_pool(request.env_id) 845 846 return True, None, None 847 848 def _on_get(self, request: DbRequest, key: KeyType, db_id: DbId = None) -> ServiceProcessingResponse: 849 coro_id = self.current_caller_coro_info.coro_id 850 key = self.serializer.dumps(normalize_compound_key(key)) 851 env_id = request.env_id 852 db_id = request._get_db_id(db_id) 853 854 if env_id in self.data_cache: 855 env_cache = self.data_cache[env_id] 856 if db_id in env_cache: 857 db_cache = env_cache[db_id] 858 if key in db_cache: 859 return True, db_cache[key], None 860 861 if env_id not in self.read_queue: 862 self.read_queue[env_id] = dict() 863 864 read_queue_env = self.read_queue[env_id] 865 if db_id not in read_queue_env: 866 read_queue_env[db_id] = dict() 867 868 read_queue_env_db = read_queue_env[db_id] 869 if key not in read_queue_env_db: 870 read_queue_env_db[key] = set() 871 872 read_queue_env_db_key = read_queue_env_db[key] 873 read_queue_env_db_key.add(coro_id) 874 self.make_live() 875 return False, None, None 876 877 def _on_get_first(self, request: DbRequest, db_id: DbId = None) -> ServiceProcessingResponse: 878 coro_id = self.current_caller_coro_info.coro_id 879 env_id = request.env_id 880 db_id = request._get_db_id(db_id) 881 882 if env_id not in self.get_first_queue: 883 self.get_first_queue[env_id] = dict() 884 885 get_first_queue_env = self.get_first_queue[env_id] 886 if db_id not in get_first_queue_env: 887 get_first_queue_env[db_id] = set() 888 889 get_first_queue_env_db = get_first_queue_env[db_id] 890 get_first_queue_env_db.add(coro_id) 891 892 self.make_live() 893 return False, None, None 894 895 def _on_get_last(self, request: DbRequest, db_id: DbId = None) -> ServiceProcessingResponse: 896 coro_id = self.current_caller_coro_info.coro_id 897 env_id = request.env_id 898 db_id = request._get_db_id(db_id) 899 900 if env_id not in self.get_last_queue: 901 self.get_last_queue[env_id] = dict() 902 903 get_last_queue_env = self.get_last_queue[env_id] 904 if db_id not in get_last_queue_env: 905 get_last_queue_env[db_id] = set() 906 907 get_last_queue_env_db = get_last_queue_env[db_id] 908 get_last_queue_env_db.add(coro_id) 909 910 self.make_live() 911 return False, None, None 912 913 def _on_get_n_items(self, request: DbRequest, desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None, reverse: bool = False) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]: 914 coro_id = self.current_caller_coro_info.coro_id 915 desired_key = self.serializer.dumps(normalize_compound_key(desired_key)) 916 db_id = request._get_db_id(db_id) 917 env_id = request.env_id 918 919 if env_id not in self.get_n_items_queue: 920 self.get_n_items_queue[env_id] = dict() 921 922 get_n_items_queue_env = self.get_n_items_queue[env_id] 923 get_n_items_queue_env[coro_id] = (db_id, desired_key, num, reverse) 924 self.make_live() 925 return False, None, None 926 927 def _on_get_items_range(self, request: DbRequest, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None, reverse: bool = False) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]: 928 coro_id = self.current_caller_coro_info.coro_id 929 first_desired_key = self.serializer.dumps(normalize_compound_key(first_desired_key)) 930 last_desired_key = self.serializer.dumps(normalize_compound_key(last_desired_key)) 931 db_id = request._get_db_id(db_id) 932 env_id = request.env_id 933 934 if env_id not in self.get_items_range_queue: 935 self.get_items_range_queue[env_id] = dict() 936 937 get_items_range_queue_env = self.get_items_range_queue[env_id] 938 get_items_range_queue_env[coro_id] = (db_id, first_desired_key, last_desired_key, num, reverse) 939 self.make_live() 940 return False, None, None 941 942 def _on_get_items(self, request: DbRequest, db_keys: Union[Set[InputKeyType], Dict[DbId, Set[InputKeyType]]]) -> Tuple[bool, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]], Optional[Exception]]: 943 coro_id = self.current_caller_coro_info.coro_id 944 env_id = request.env_id 945 946 if env_id not in self.massive_read_queue: 947 self.massive_read_queue[env_id] = dict() 948 949 massive_read_queue_env = self.massive_read_queue[env_id] 950 if coro_id not in massive_read_queue_env: 951 massive_read_queue_env[coro_id] = dict() 952 953 massive_read_queue_env_coro = massive_read_queue_env[coro_id] 954 if request.db_id is not None: 955 db_keys = {request.db_id: db_keys} 956 957 for db_id, keys in db_keys.items(): 958 if db_id not in massive_read_queue_env_coro: 959 massive_read_queue_env_coro[db_id] = set() 960 961 massive_read_queue_env_coro_db = massive_read_queue_env_coro[db_id] 962 for key in keys: 963 massive_read_queue_env_coro_db.add(self.serializer.dumps(normalize_compound_key(key))) 964 965 self.make_live() 966 return False, None, None 967 968 def _on_get_all_items(self, request: DbRequest, db_id: DbId) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]: 969 db_id = request._get_db_id(db_id) 970 coro_id = self.current_caller_coro_info.coro_id 971 self.get_all_items_queue.append((coro_id, db_id, request.env_id)) 972 self.make_live() 973 return False, None, None 974 975 def _on_put(self, request: DbRequest, key: KeyType, value: Any, db_id: DbId = None) -> Tuple[bool, RawValueType, Optional[Exception]]: 976 key = self.serializer.dumps(normalize_compound_key(key)) 977 env_id = request.env_id 978 db_id = request._get_db_id(db_id) 979 980 exception = None 981 result = None 982 try: 983 if env_id not in self.data_cache: 984 self.data_cache[env_id] = dict() 985 986 env_data_cache = self.data_cache[env_id] 987 if db_id not in env_data_cache: 988 env_data_cache[db_id] = dict() 989 990 env_data_cache_db = env_data_cache[db_id] 991 result = env_data_cache_db[key] = self.serializer.dumps(value) 992 except: 993 exception = get_exception() 994 995 self.make_live() 996 return True, result, exception 997 998 def _on_put_items(self, request: DbRequest, db_items: Dict[InputKeyType, ValueType], db_id: DbId = None) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]], Optional[Exception]]: 999 result_items: Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]] = dict() 1000 env_id = request.env_id 1001 db_id = request._get_db_id(db_id) 1002 1003 if env_id not in self.data_cache: 1004 self.data_cache[env_id] = dict() 1005 1006 env_data_cache = self.data_cache[env_id] 1007 if db_id not in result_items: 1008 result_items[db_id] = dict() 1009 1010 result_db_items = result_items[db_id] 1011 1012 if db_id not in env_data_cache: 1013 env_data_cache[db_id] = dict() 1014 1015 env_data_cache_db = env_data_cache[db_id] 1016 for key, value in db_items.items(): 1017 key = self.serializer.dumps(normalize_compound_key(key)) 1018 1019 exception = None 1020 result = None 1021 try: 1022 result = env_data_cache_db[key] = self.serializer.dumps(value) 1023 except: 1024 exception = get_exception() 1025 1026 result_db_items[key] = (result, exception) 1027 1028 self.make_live() 1029 return True, result_items, None 1030 1031 def _on_delete(self, request: DbRequest, key: KeyType, db_id: DbId = None) -> Tuple[bool, None, Optional[Exception]]: 1032 key = self.serializer.dumps(normalize_compound_key(key)) 1033 db_id = request._get_db_id(db_id) 1034 env_id = request.env_id 1035 1036 exception = None 1037 result = None 1038 try: 1039 if env_id not in self.deletion_cache: 1040 self.deletion_cache[env_id] = dict() 1041 1042 env_deletion_cache = self.deletion_cache[env_id] 1043 if db_id not in env_deletion_cache: 1044 env_deletion_cache[db_id] = set() 1045 1046 env_deletion_cache_db = env_deletion_cache[db_id] 1047 env_deletion_cache_db.add(key) 1048 except: 1049 exception = get_exception() 1050 1051 self.make_live() 1052 return True, result, exception 1053 1054 def _on_delete_kv(self, request: DbRequest, key: InputKeyType, value: ValueType, db_id: DbId = None) -> Tuple[bool, RawValueType, Optional[Exception]]: 1055 key = self.serializer.dumps(normalize_compound_key(key)) 1056 db_id = request._get_db_id(db_id) 1057 env_id = request.env_id 1058 1059 exception = None 1060 result = None 1061 try: 1062 if env_id not in self.kv_deletion_cache: 1063 self.kv_deletion_cache[env_id] = dict() 1064 1065 env_kv_deletion_cache = self.kv_deletion_cache[env_id] 1066 if db_id not in env_kv_deletion_cache: 1067 env_kv_deletion_cache[db_id] = dict() 1068 1069 env_kv_deletion_cache_db = env_kv_deletion_cache[db_id] 1070 result = env_kv_deletion_cache_db[key] = self.serializer.dumps(value) 1071 except: 1072 exception = get_exception() 1073 1074 self.make_live() 1075 return True, result, exception 1076 1077 def _on_delete_items(self, request: DbRequest, db_items: Set[InputKeyType], db_id: DbId = None) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Optional[Exception]]], Optional[Exception]]: 1078 result_items: Dict[DbId, Dict[RawKeyType, Optional[Exception]]] = dict() 1079 env_id = request.env_id 1080 1081 if env_id not in self.deletion_cache: 1082 self.deletion_cache[env_id] = dict() 1083 1084 env_deletion_cache = self.deletion_cache[env_id] 1085 if db_id not in result_items: 1086 result_items[db_id] = dict() 1087 1088 result_db_items = result_items[db_id] 1089 1090 if db_id not in env_deletion_cache: 1091 env_deletion_cache[db_id] = set() 1092 1093 env_deletion_cache_db = env_deletion_cache[db_id] 1094 for key in db_items: 1095 key = self.serializer.dumps(normalize_compound_key(key)) 1096 1097 exception = None 1098 try: 1099 env_deletion_cache_db.add(key) 1100 except: 1101 exception = get_exception() 1102 1103 result_db_items[key] = exception 1104 1105 self.make_live() 1106 return True, result_items, None 1107 1108 def _on_delete_kv_items(self, request: DbRequest, db_items: Dict[InputKeyType, Tuple[ValueType]], db_id: DbId = None) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]], Optional[Exception]]: 1109 result_items: Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]] = dict() 1110 env_id = request.env_id 1111 1112 if env_id not in self.kv_deletion_cache: 1113 self.kv_deletion_cache[env_id] = dict() 1114 1115 env_kv_deletion_cache = self.kv_deletion_cache[env_id] 1116 if db_id not in result_items: 1117 result_items[db_id] = dict() 1118 1119 result_db_items = result_items[db_id] 1120 1121 if db_id not in env_kv_deletion_cache: 1122 env_kv_deletion_cache[db_id] = dict() 1123 1124 env_kv_deletion_cache_db = env_kv_deletion_cache[db_id] 1125 for key, value in db_items.items(): 1126 exception = None 1127 result = None 1128 try: 1129 key = self.serializer.dumps(normalize_compound_key(key)) 1130 result = env_kv_deletion_cache_db[key] = self.serializer.dumps(value) 1131 except: 1132 exception = get_exception() 1133 1134 result_db_items[key] = (result, exception) 1135 1136 self.make_live() 1137 return True, result_items, None 1138 1139 def _on_open_databases(self, request: DbRequest, db_names: Dict[DbId, DbName]) -> ServiceProcessingResponse: 1140 exception = None 1141 try: 1142 env_info: EnvInfo = self.db_environments[request.env_id] 1143 except KeyError: 1144 exception = UnknownEnvError(request.env_id) 1145 1146 for db_id, db_name in db_names.items(): 1147 env_info.databases[db_id] = env_info.env.open_db(db_name) 1148 env_info.db_names[db_id] = db_name 1149 1150 env_info.env.sync(True) 1151 return True, None, exception 1152 1153 def _on_drop_db(self, request: DbRequest, db_id: DbId, delete: bool = False) -> ServiceProcessingResponse: 1154 coro_id = self.current_caller_coro_info.coro_id 1155 env_id = request.env_id 1156 1157 if env_id not in self.drop_db_requests: 1158 self.drop_db_requests[env_id] = dict() 1159 1160 drop_db_requests_env = self.drop_db_requests[env_id] 1161 1162 if db_id not in drop_db_requests_env: 1163 drop_db_requests_env[db_id] = [False, set()] 1164 1165 drop_db_requests_env_db = drop_db_requests_env[db_id] 1166 1167 if delete: 1168 drop_db_requests_env_db[0] = delete 1169 1170 drop_db_requests_env_db[1].add(coro_id) 1171 self.make_live() 1172 return False, None, None 1173 1174 def sync_in_thread_pool(self, env_id: EnvId = None): 1175 async def sync_db_coro(i: Interface, self: 'Db', env_id: EnvId, asyncio_loop, need_to_ensure_asyncio_loop: bool): 1176 if need_to_ensure_asyncio_loop: 1177 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None,CoroPriority.low, True)) 1178 else: 1179 if asyncio_loop is None: 1180 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest.get()) 1181 1182 async def sync_db(self: 'Db', asyncio_loop, env: lmdb.Environment): 1183 def sync_worker(): 1184 env.sync(True) 1185 self.write_locked.discard(env_id) 1186 1187 await task_in_thread_pool(asyncio_loop, sync_worker) 1188 1189 env: lmdb.Environment = self.db_environments[env_id].env 1190 await i(AsyncioLoop, AsyncioLoopRequest.wait(sync_db(self, asyncio_loop, env))) 1191 self.write_locked_coro_id.discard(i.coro_id) 1192 def make_service_live_for_a_next_sync(self: 'Db'): 1193 self.make_live() 1194 1195 await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self) 1196 1197 asyncio_loop = None 1198 need_to_ensure_asyncio_loop = False 1199 try: 1200 asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get() 1201 except AsyncioLoopWasNotSetError: 1202 need_to_ensure_asyncio_loop = True 1203 1204 coro: CoroWrapperBase = put_root_from_other_service(self, sync_db_coro, env_id, self, asyncio_loop, need_to_ensure_asyncio_loop) 1205 coro.is_background_coro = True 1206 self.write_locked.add(env_id) 1207 self.write_locked_coro_id.add(coro.coro_id) 1208 1209 1210DbRequest.default_service_type = Db 1211 1212 1213def lmdb_reapplier(env_info: EnvInfo, handler: Callable, *args, **kwargs): 1214 environment: lmdb.Environment = env_info.env 1215 databases: Dict[Hashable, Any] = env_info.databases 1216 failed = True 1217 while failed: 1218 need_to_resize: bool = False 1219 try: 1220 handler(environment, databases, *args, **kwargs) 1221 failed = False 1222 except DBError as err: 1223 if isinstance(err.original_exception, lmdb.MapFullError): 1224 need_to_resize = True 1225 1226 if need_to_resize: 1227 environment.set_mapsize(environment.info()['map_size'] + 2 * 1024**2)
class
Db(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin):
300class Db(Service, EntityStatsMixin): 301 def __init__(self, loop: CoroSchedulerType): 302 super(Db, self).__init__(loop) 303 self.default_db_id: DbName = b'__default__' 304 self.default_env_name: str = '__default__.dbenv' 305 # self.drop_db_requests: Dict[CoroID, Tuple[Hashable, bool]] = dict() 306 # self.drop_db_requests: Dict[EnvId, Dict[CoroID, Tuple[DbId, bool]]] = dict() 307 self.drop_db_requests: Dict[EnvId, Dict[DbId, List[bool, Set[CoroID]]]] = dict() 308 # self.read_queue: List[Tuple[CoroID, Tuple[RawKeyType, DbId]]] = list() 309 self.read_queue: Dict[EnvId, Dict[DbId, Dict[RawKeyType, Set[CoroID]]]] = dict() 310 self.massive_read_queue: Dict[EnvId, Dict[CoroID, Dict[DbId, Set[RawKeyType]]]] = dict() 311 # self.massive_read_queue: List[Tuple[CoroID, Set[Tuple[KeyType, DbId]]]] = list() 312 self.data_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, RawValueType]]] = dict() 313 # self.data_cache: Dict[EnvId, Dict[Tuple[RawKeyType, DbId], RawValueType]] = dict() 314 self.deletion_cache: Dict[EnvId, Dict[DbId, Set[RawKeyType]]] = dict() 315 self.kv_deletion_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, RawValueType]]] = dict() 316 # self.kv_deletion_cache: Dict[Tuple[Hashable, Hashable], Any] = dict() 317 self.get_first_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict() 318 self.get_last_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict() 319 self.get_n_items_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, int, bool]]] = dict() 320 self.get_items_range_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, RawKeyType, int, bool]]] = dict() 321 self.get_all_items_queue: List[Tuple[CoroID, DbId, EnvId]] = list() 322 self.path_to_default_db_environment: str = None 323 self.app_name_waiter: CoroWrapperBase = None 324 self.default_db_environment: lmdb.Environment = None 325 self.db_environments: Dict[EnvId, EnvInfo] = dict() 326 # self.databases: Dict[Hashable, Any] = dict() 327 # self.db_names: Dict[DbId, DbName] = dict() 328 self.async_loop = None 329 self.sync_time_interval = 1.0 330 self.last_sync_time = perf_counter() 331 self.force_sync: Set[EnvId] = set() 332 self.write_locked: Set[EnvId] = set() 333 self.writes_num: int = 0 334 self.reads_num: int = 0 335 self.deletes_num: int = 0 336 self.db_drops_num: int = 0 337 self.write_locked_coro_id: Set[CoroID] = set() 338 # self.serializer = best_serializer_for_standard_data((DataFormats.binary, 339 # Tags.can_use_bytes, 340 # Tags.decode_str_as_str, 341 # Tags.decode_list_as_list, 342 # Tags.decode_bytes_as_bytes, 343 # Tags.superficial, 344 # Tags.current_platform, 345 # Tags.multi_platform), 346 # TestDataType.small, 347 # 0.1) 348 self.serializer = Serializer(Serializers.msgspec_messagepack) 349 350 self._request_workers = { 351 0: self._on_set_default_db_environment_path, 352 1: self._on_open_databases, 353 2: self._on_drop_db, 354 3: self._on_sync, 355 4: self._on_get, 356 5: self._on_get_items, 357 6: self._on_get_all_items, 358 7: self._on_put, 359 8: self._on_put_items, 360 9: self._on_delete, 361 10: self._on_delete_kv, 362 11: self._on_delete_items, 363 12: self._on_delete_kv_items, 364 13: self._on_open_db_environment, 365 14: self._on_get_first, 366 15: self._on_get_last, 367 16: self._on_get_n_items, 368 17: self._on_get_items_range, 369 } 370 371 # TODO: sync with last implementation 372 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 373 return type(self).__name__, { 374 'db_names': list(self.db_names.keys()), 375 'writes num': self.writes_num, 376 'reads num': self.reads_num, 377 'deletes num': self.deletes_num, 378 'db drops num': self.db_drops_num, 379 } 380 381 def norm_key(self, key: InputKeyType) -> NormalizedKeyType: 382 return normalize_compound_key(key) 383 384 def raw_key(self, env_or_id: Union[lmdb.Environment, EnvId], key: InputKeyType) -> RawKeyType: 385 raw_key: bytes = self.serializer.dumps(normalize_compound_key(key)) 386 if isinstance(env_or_id, lmdb.Environment): 387 env = env_or_id 388 else: 389 env = self.db_environments[env_or_id].env 390 391 if len(raw_key) > env.max_key_size(): 392 raise RawKeyIsTooLargeError(f'Raw form ({raw_key=}) of the key ({key=}) is too large: {len(raw_key)} > {env.max_key_size()}') 393 394 def single_task_registration_or_immediate_processing( 395 self, *args, **kwargs) -> ServiceProcessingResponse: 396 result = self.try_resolve_request(*args, **kwargs) 397 if result is None: 398 return True, None, None 399 else: 400 return result 401 402 def full_processing_iteration(self): 403 if self.default_db_environment is None: 404 if self.path_to_default_db_environment is None: 405 if self.app_name_waiter is None: 406 async def coro(i: Interface, self: 'Db'): 407 app_name_for_fs = await i(InstanceRequest().wait('app_name_for_fs')) 408 app_data_dir_path_type: AppDirectoryType = await i(InstanceRequest().wait('app_data_dir_path_type')) 409 app_dir_path: AppDirPath = await i(InstanceRequest().wait(AppDirPath)) 410 app_data_dir_path: str = app_dir_path.cached(app_data_dir_path_type, app_name_for_fs) 411 self.path_to_default_db_environment = RelativePath(app_data_dir_path)(self.default_env_name) 412 self.app_name_waiter = None 413 self.make_live() 414 415 self.app_name_waiter = put_root_from_other_service(self, coro, self) 416 self.app_name_waiter.is_background_coro = True 417 418 self.make_dead() 419 return 420 else: 421 self._init_default_db() 422 423 if self.force_sync: 424 envs_need_to_be_sync: Set[DbId] = self.force_sync 425 self.force_sync = set() 426 else: 427 envs_need_to_be_sync = set() 428 429 data_cache_buff = self.data_cache 430 self.data_cache = type(data_cache_buff)() 431 432 read_queue_buff = self.read_queue 433 self.read_queue = type(read_queue_buff)() 434 435 massive_read_queue_buff = self.massive_read_queue 436 self.massive_read_queue = type(massive_read_queue_buff)() 437 438 kv_deletion_cache_buff = self.kv_deletion_cache 439 self.kv_deletion_cache = type(kv_deletion_cache_buff)() 440 441 get_all_items_queue_buff = self.get_all_items_queue 442 self.get_all_items_queue = type(get_all_items_queue_buff)() 443 444 get_first_queue_buff = self.get_first_queue 445 self.get_first_queue = type(get_first_queue_buff)() 446 447 get_last_queue_buff = self.get_last_queue 448 self.get_last_queue = type(get_last_queue_buff)() 449 450 get_n_items_queue_buff = self.get_n_items_queue 451 self.get_n_items_queue = type(get_n_items_queue_buff)() 452 453 get_items_range_queue_buff = self.get_items_range_queue 454 self.get_items_range_queue = type(get_items_range_queue_buff)() 455 456 # put 457 def put_handler(env_info: EnvInfo, put_info: Dict[DbId, Dict[RawKeyType, RawValueType]]): 458 try: 459 with env_info.env.begin(write=True) as txn: 460 for db_id, db_put_info in put_info.items(): 461 if db_id in env_info.databases: 462 for raw_key, value in db_put_info.items(): 463 txn.put(raw_key, value, db=env_info.databases[db_id], dupdata=False, append=False) 464 465 self.writes_num += len(db_put_info) 466 except lmdb.MapFullError: 467 raise DBError.from_exception(db_id) 468 469 for env_id, put_info in data_cache_buff.items(): 470 if env_id in self.db_environments: 471 envs_need_to_be_sync.add(env_id) 472 lmdb_reapplier(self.db_environments[env_id], put_handler, put_info) 473 474 # TODO: implement delete* methods processing 475 # delete 476 for env_id, kv_deletion_cache_buff_db_info in kv_deletion_cache_buff.items(): 477 if env_id in self.db_environments: 478 envs_need_to_be_sync.add(env_id) 479 ... 480 481 for key_info, value in kv_deletion_cache_buff.items(): 482 with self.default_db_environment.begin(write=True) as txn: 483 key, db_id = key_info 484 txn.delete(key, value, db=self.databases[db_id]) 485 self.deletes_num += 1 486 487 # drop 488 drop_db_requests_buff = self.drop_db_requests 489 self.drop_db_requests = type(drop_db_requests_buff)() 490 491 def drop_handler(env_info: EnvInfo, drop_info: Dict[DbId, List[bool, Set[CoroID]]]): 492 for db_id, db_drop_info in drop_info.items(): 493 delete_db, coro_id = db_drop_info 494 if db_id in env_info.databases: 495 try: 496 with env_info.env.begin(write=True) as txn: 497 txn.drop(db=env_info.databases[db_id], delete=delete_db) 498 if delete_db: 499 del env_info.databases[db_id] 500 del env_info.db_names[db_id] 501 502 self.db_drops_num += 1 503 except lmdb.MapFullError: 504 raise DBError.from_exception(db_id) 505 506 self.register_response(coro_id, None, UnknownEnvError(env_id)) 507 else: 508 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 509 510 for env_id, drop_info in drop_db_requests_buff.items(): 511 if env_id in self.db_environments: 512 envs_need_to_be_sync.add(env_id) 513 lmdb_reapplier(self.db_environments[env_id], drop_handler, drop_info) 514 else: 515 for db_id, db_drop_info in drop_info.items(): 516 delete_db, coro_id = db_drop_info 517 self.register_response(coro_id, None, UnknownEnvError(env_id)) 518 519 # get 520 def get_item(txn, key_info: Tuple[RawKeyType, DbId, EnvId], data_cache_buff: Dict[EnvId, Dict[DbId, Dict[RawKeyType, RawValueType]]]) -> Tuple[ValueType, Optional[Exception]]: 521 key, db_id, env_id = key_info 522 need_to_get_from_db = True 523 try: 524 value = data_cache_buff[env_id][db_id][key] 525 need_to_get_from_db = False 526 except KeyError: 527 pass 528 529 if need_to_get_from_db: 530 value = txn.get(key, db=self.db_environments[env_id].databases[db_id]) 531 self.reads_num += 1 532 533 exception = None 534 try: 535 if value is None: 536 exception = DbKeyError(key_info) 537 else: 538 value = self.serializer.loads(value) 539 except: 540 exception = get_exception() 541 542 return value, exception 543 544 # _on_get 545 for env_id, read_queue_buff_db_info in read_queue_buff.items(): 546 if env_id in self.db_environments: 547 env_info = self.db_environments[env_id] 548 for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items(): 549 if db_id in env_info.databases: 550 with env_info.env.begin() as txn: 551 for raw_key, coro_ids in read_queue_buff_db_key_info.items(): 552 value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff) 553 for coro_id in coro_ids: 554 self.register_response(coro_id, value, exception) 555 else: 556 for raw_key, coro_ids in read_queue_buff_db_key_info.items(): 557 for coro_id in coro_ids: 558 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 559 else: 560 for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items(): 561 for raw_key, coro_ids in read_queue_buff_db_key_info.items(): 562 for coro_id in coro_ids: 563 self.register_response(coro_id, None, UnknownEnvError(env_id)) 564 565 # _on_get_items 566 results: Dict[CoroID, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]]] = dict() 567 for env_id, massive_read_queue_buff_coro_info in massive_read_queue_buff.items(): 568 if env_id in self.db_environments: 569 env_info = self.db_environments[env_id] 570 for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items(): 571 if coro_id not in results: 572 results[coro_id] = dict() 573 574 coro_results = results[coro_id] 575 for db_id, raw_keys in read_queue_buff_db_info.items(): 576 if db_id not in coro_results: 577 coro_results[db_id] = dict() 578 579 coro_db_results = coro_results[db_id] 580 if db_id in env_info.databases: 581 with env_info.env.begin() as txn: 582 for raw_key in raw_keys: 583 value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff) 584 coro_db_results[normalize_compound_key(raw_key)] = (value, exception) 585 else: 586 for coro_id in coro_ids: 587 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 588 else: 589 for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items(): 590 for db_id, raw_keys in read_queue_buff_db_info.items(): 591 for coro_id in coro_ids: 592 self.register_response(coro_id, None, UnknownEnvError(env_id)) 593 594 for coro_id, coro_results in results.items(): 595 self.register_response(coro_id, coro_results, None) 596 597 # get all items 598 for coro_id, db_id, env_id in get_all_items_queue_buff: 599 if env_id in self.db_environments: 600 env_info = self.db_environments[env_id] 601 env = env_info.env 602 if db_id in env_info.databases: 603 db = env_info.databases[db_id] 604 with env.begin(db=db) as txn: 605 result = list() 606 exception = None 607 try: 608 result = [(normalize_compound_key(self.serializer.loads(k)), self.serializer.loads(v)) for k, v in txn.cursor()] 609 self.reads_num += len(result) 610 except: 611 exception = get_exception() 612 613 self.register_response(coro_id, result, exception) 614 else: 615 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 616 else: 617 self.register_response(coro_id, None, UnknownEnvError(env_id)) 618 619 # get_first 620 for env_id, get_first_queue_buff_db_info in get_first_queue_buff.items(): 621 if env_id in self.db_environments: 622 env_info = self.db_environments[env_id] 623 for db_id, coro_ids in get_first_queue_buff_db_info.items(): 624 if db_id in env_info.databases: 625 db = env_info.databases[db_id] 626 with env_info.env.begin(db=db) as txn: 627 result = None 628 exception = None 629 try: 630 cursor: lmdb.Cursor = txn.cursor() 631 if cursor.first(): 632 result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value())) 633 self.reads_num += 1 634 else: 635 exception = KeyError() 636 except: 637 exception = get_exception() 638 639 for coro_id in coro_ids: 640 self.register_response(coro_id, result, exception) 641 else: 642 for coro_id in coro_ids: 643 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 644 else: 645 for db_id, coro_ids in get_first_queue_buff_db_info.items(): 646 for coro_id in coro_ids: 647 self.register_response(coro_id, None, UnknownEnvError(env_id)) 648 649 # get_last 650 for env_id, get_last_queue_buff_db_info in get_last_queue_buff.items(): 651 if env_id in self.db_environments: 652 env_info = self.db_environments[env_id] 653 for db_id, coro_ids in get_last_queue_buff_db_info.items(): 654 if db_id in env_info.databases: 655 db = env_info.databases[db_id] 656 with env_info.env.begin(db=db) as txn: 657 result = None 658 exception = None 659 try: 660 cursor: lmdb.Cursor = txn.cursor() 661 if cursor.first(): 662 result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value())) 663 self.reads_num += 1 664 else: 665 exception = KeyError() 666 except: 667 exception = get_exception() 668 669 for coro_id in coro_ids: 670 self.register_response(coro_id, result, exception) 671 else: 672 for coro_id in coro_ids: 673 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 674 else: 675 for db_id, coro_ids in get_last_queue_buff_db_info.items(): 676 for coro_id in coro_ids: 677 self.register_response(coro_id, None, UnknownEnvError(env_id)) 678 679 # get_n_items 680 for env_id, get_n_items_queue_buff_coro_info in get_n_items_queue_buff.items(): 681 if env_id in self.db_environments: 682 env_info = self.db_environments[env_id] 683 for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items(): 684 coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list() 685 db_id, first_desired_raw_key, num_items, reverse = read_queue_buff_db_info 686 if db_id in env_info.databases: 687 db = env_info.databases[db_id] 688 with env_info.env.begin(db=db) as txn: 689 result = None 690 exception = None 691 try: 692 cursor: lmdb.Cursor = txn.cursor() 693 if cursor.set_range(first_desired_raw_key): 694 if reverse: 695 cursor_iterator = cursor.iterprev(keys=True, values=True) 696 else: 697 cursor_iterator = cursor.iternext(keys=True, values=True) 698 699 for raw_key, raw_value in cursor_iterator: 700 if (num_items is not None) and (num_items <= 0): 701 break 702 703 coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value))) 704 self.reads_num += 1 705 if num_items is not None: 706 num_items -= 1 707 else: 708 exception = KeyError() 709 except: 710 exception = get_exception() 711 712 self.register_response(coro_id, result, exception) 713 else: 714 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 715 else: 716 for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items(): 717 for coro_id in coro_ids: 718 self.register_response(coro_id, None, UnknownEnvError(env_id)) 719 720 # get_items_range 721 for env_id, get_items_range_queue_buff_coro_info in get_items_range_queue_buff.items(): 722 if env_id in self.db_environments: 723 env_info = self.db_environments[env_id] 724 for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items(): 725 coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list() 726 db_id, first_desired_raw_key, last_desired_raw_key, num_items, reverse = read_queue_buff_db_info 727 if db_id in env_info.databases: 728 db = env_info.databases[db_id] 729 with env_info.env.begin(db=db) as txn: 730 result = None 731 exception = None 732 try: 733 cursor: lmdb.Cursor = txn.cursor() 734 if cursor.set_range(first_desired_raw_key): 735 if reverse: 736 cursor_iterator = cursor.iterprev(keys=True, values=True) 737 else: 738 cursor_iterator = cursor.iternext(keys=True, values=True) 739 740 for raw_key, raw_value in cursor_iterator: 741 if reverse: 742 if raw_key < last_desired_raw_key: 743 break 744 else: 745 if raw_key > last_desired_raw_key: 746 break 747 748 if (num_items is not None) and (num_items <= 0): 749 break 750 751 coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value))) 752 self.reads_num += 1 753 if num_items is not None: 754 num_items -= 1 755 else: 756 exception = KeyError() 757 except: 758 exception = get_exception() 759 760 self.register_response(coro_id, result, exception) 761 else: 762 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 763 else: 764 for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items(): 765 for coro_id in coro_ids: 766 self.register_response(coro_id, None, UnknownEnvError(env_id)) 767 768 # sync 769 if envs_need_to_be_sync: 770 for env_id in envs_need_to_be_sync: 771 self.sync_in_thread_pool(env_id) 772 773 self.last_sync_time = perf_counter() 774 775 self.make_dead() 776 777 def in_work(self) -> bool: 778 result: bool = (bool(self.get_first_queue) or bool(self.get_last_queue) or bool(self.get_n_items_queue) or bool(self.get_items_range_queue) or bool(self.read_queue) or bool(self.massive_read_queue) or bool(self.get_all_items_queue)) or (bool(self.force_sync) or ((not bool(self.write_locked)) and (bool(self.data_cache) or bool(self.kv_deletion_cache) or bool(self.drop_db_requests)) and ((perf_counter() - self.last_sync_time) >= self.sync_time_interval))) 779 return self.thrifty_in_work(result) 780 781 def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]: 782 time_since_last_sync_time: float = perf_counter() - self.last_sync_time 783 if self.sync_time_interval > time_since_last_sync_time: 784 return True, self.sync_time_interval - time_since_last_sync_time 785 else: 786 return True, 0 787 788 def _init_db(self, db_id: DbId): 789 # print(f'{self.path_to_default_db_environment=}') 790 if db_id in self.db_environments: 791 return self.db_environments[db_id] 792 793 env_init_info: EnvInitInfo = EnvInitInfo( 794 db_id, 795 map_size=20 * 1024**2, 796 writemap=True, 797 max_dbs=10, 798 map_async=True, 799 lock=False, 800 metasync=False, 801 sync=False, 802 meminit=False, 803 ) 804 env_info: EnvInfo = EnvInfo(env_init_info, lmdb.open(*env_init_info.args, **env_init_info.kwargs)) 805 db_environment = env_info.env 806 self.db_environments[env_info.env_id] = env_info 807 env_info.databases[self.default_db_id] = env_info.databases[None] = db_environment.open_db(self.default_db_id) 808 db_environment.sync(True) 809 810 def _init_default_db(self): 811 env_info: EnvInfo = self._init_db(self.path_to_default_db_environment) 812 self.db_environments[None] = env_info 813 814 def _on_open_db_environment(self, request: DbRequest, path_to_db_environment: str) -> ServiceProcessingResponse: 815 result: EnvInfo = None 816 try: 817 result = self._init_db(path_to_db_environment) 818 except: 819 exception = get_exception() 820 return True, result, exception 821 822 return True, result.env_id, None 823 824 def _on_set_default_db_environment_path(self, request: DbRequest, path_to_db_environment: str) -> ServiceProcessingResponse: 825 if {None, path_to_db_environment} & self.write_locked: 826 return True, False, None 827 828 if self.default_db_environment is None: 829 self.path_to_default_db_environment = path_to_db_environment 830 try: 831 self._init_default_db() 832 except: 833 exception = get_exception() 834 return True, False, exception 835 return True, True, None 836 else: 837 return True, False, None 838 839 def _on_sync(self, request: DbRequest) -> ServiceProcessingResponse: 840 if self.data_cache: 841 self.force_sync.add(request.env_id) 842 self.make_live() 843 else: 844 # self.default_db_environment.sync(True) 845 self.sync_in_thread_pool(request.env_id) 846 847 return True, None, None 848 849 def _on_get(self, request: DbRequest, key: KeyType, db_id: DbId = None) -> ServiceProcessingResponse: 850 coro_id = self.current_caller_coro_info.coro_id 851 key = self.serializer.dumps(normalize_compound_key(key)) 852 env_id = request.env_id 853 db_id = request._get_db_id(db_id) 854 855 if env_id in self.data_cache: 856 env_cache = self.data_cache[env_id] 857 if db_id in env_cache: 858 db_cache = env_cache[db_id] 859 if key in db_cache: 860 return True, db_cache[key], None 861 862 if env_id not in self.read_queue: 863 self.read_queue[env_id] = dict() 864 865 read_queue_env = self.read_queue[env_id] 866 if db_id not in read_queue_env: 867 read_queue_env[db_id] = dict() 868 869 read_queue_env_db = read_queue_env[db_id] 870 if key not in read_queue_env_db: 871 read_queue_env_db[key] = set() 872 873 read_queue_env_db_key = read_queue_env_db[key] 874 read_queue_env_db_key.add(coro_id) 875 self.make_live() 876 return False, None, None 877 878 def _on_get_first(self, request: DbRequest, db_id: DbId = None) -> ServiceProcessingResponse: 879 coro_id = self.current_caller_coro_info.coro_id 880 env_id = request.env_id 881 db_id = request._get_db_id(db_id) 882 883 if env_id not in self.get_first_queue: 884 self.get_first_queue[env_id] = dict() 885 886 get_first_queue_env = self.get_first_queue[env_id] 887 if db_id not in get_first_queue_env: 888 get_first_queue_env[db_id] = set() 889 890 get_first_queue_env_db = get_first_queue_env[db_id] 891 get_first_queue_env_db.add(coro_id) 892 893 self.make_live() 894 return False, None, None 895 896 def _on_get_last(self, request: DbRequest, db_id: DbId = None) -> ServiceProcessingResponse: 897 coro_id = self.current_caller_coro_info.coro_id 898 env_id = request.env_id 899 db_id = request._get_db_id(db_id) 900 901 if env_id not in self.get_last_queue: 902 self.get_last_queue[env_id] = dict() 903 904 get_last_queue_env = self.get_last_queue[env_id] 905 if db_id not in get_last_queue_env: 906 get_last_queue_env[db_id] = set() 907 908 get_last_queue_env_db = get_last_queue_env[db_id] 909 get_last_queue_env_db.add(coro_id) 910 911 self.make_live() 912 return False, None, None 913 914 def _on_get_n_items(self, request: DbRequest, desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None, reverse: bool = False) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]: 915 coro_id = self.current_caller_coro_info.coro_id 916 desired_key = self.serializer.dumps(normalize_compound_key(desired_key)) 917 db_id = request._get_db_id(db_id) 918 env_id = request.env_id 919 920 if env_id not in self.get_n_items_queue: 921 self.get_n_items_queue[env_id] = dict() 922 923 get_n_items_queue_env = self.get_n_items_queue[env_id] 924 get_n_items_queue_env[coro_id] = (db_id, desired_key, num, reverse) 925 self.make_live() 926 return False, None, None 927 928 def _on_get_items_range(self, request: DbRequest, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None, reverse: bool = False) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]: 929 coro_id = self.current_caller_coro_info.coro_id 930 first_desired_key = self.serializer.dumps(normalize_compound_key(first_desired_key)) 931 last_desired_key = self.serializer.dumps(normalize_compound_key(last_desired_key)) 932 db_id = request._get_db_id(db_id) 933 env_id = request.env_id 934 935 if env_id not in self.get_items_range_queue: 936 self.get_items_range_queue[env_id] = dict() 937 938 get_items_range_queue_env = self.get_items_range_queue[env_id] 939 get_items_range_queue_env[coro_id] = (db_id, first_desired_key, last_desired_key, num, reverse) 940 self.make_live() 941 return False, None, None 942 943 def _on_get_items(self, request: DbRequest, db_keys: Union[Set[InputKeyType], Dict[DbId, Set[InputKeyType]]]) -> Tuple[bool, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]], Optional[Exception]]: 944 coro_id = self.current_caller_coro_info.coro_id 945 env_id = request.env_id 946 947 if env_id not in self.massive_read_queue: 948 self.massive_read_queue[env_id] = dict() 949 950 massive_read_queue_env = self.massive_read_queue[env_id] 951 if coro_id not in massive_read_queue_env: 952 massive_read_queue_env[coro_id] = dict() 953 954 massive_read_queue_env_coro = massive_read_queue_env[coro_id] 955 if request.db_id is not None: 956 db_keys = {request.db_id: db_keys} 957 958 for db_id, keys in db_keys.items(): 959 if db_id not in massive_read_queue_env_coro: 960 massive_read_queue_env_coro[db_id] = set() 961 962 massive_read_queue_env_coro_db = massive_read_queue_env_coro[db_id] 963 for key in keys: 964 massive_read_queue_env_coro_db.add(self.serializer.dumps(normalize_compound_key(key))) 965 966 self.make_live() 967 return False, None, None 968 969 def _on_get_all_items(self, request: DbRequest, db_id: DbId) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]: 970 db_id = request._get_db_id(db_id) 971 coro_id = self.current_caller_coro_info.coro_id 972 self.get_all_items_queue.append((coro_id, db_id, request.env_id)) 973 self.make_live() 974 return False, None, None 975 976 def _on_put(self, request: DbRequest, key: KeyType, value: Any, db_id: DbId = None) -> Tuple[bool, RawValueType, Optional[Exception]]: 977 key = self.serializer.dumps(normalize_compound_key(key)) 978 env_id = request.env_id 979 db_id = request._get_db_id(db_id) 980 981 exception = None 982 result = None 983 try: 984 if env_id not in self.data_cache: 985 self.data_cache[env_id] = dict() 986 987 env_data_cache = self.data_cache[env_id] 988 if db_id not in env_data_cache: 989 env_data_cache[db_id] = dict() 990 991 env_data_cache_db = env_data_cache[db_id] 992 result = env_data_cache_db[key] = self.serializer.dumps(value) 993 except: 994 exception = get_exception() 995 996 self.make_live() 997 return True, result, exception 998 999 def _on_put_items(self, request: DbRequest, db_items: Dict[InputKeyType, ValueType], db_id: DbId = None) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]], Optional[Exception]]: 1000 result_items: Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]] = dict() 1001 env_id = request.env_id 1002 db_id = request._get_db_id(db_id) 1003 1004 if env_id not in self.data_cache: 1005 self.data_cache[env_id] = dict() 1006 1007 env_data_cache = self.data_cache[env_id] 1008 if db_id not in result_items: 1009 result_items[db_id] = dict() 1010 1011 result_db_items = result_items[db_id] 1012 1013 if db_id not in env_data_cache: 1014 env_data_cache[db_id] = dict() 1015 1016 env_data_cache_db = env_data_cache[db_id] 1017 for key, value in db_items.items(): 1018 key = self.serializer.dumps(normalize_compound_key(key)) 1019 1020 exception = None 1021 result = None 1022 try: 1023 result = env_data_cache_db[key] = self.serializer.dumps(value) 1024 except: 1025 exception = get_exception() 1026 1027 result_db_items[key] = (result, exception) 1028 1029 self.make_live() 1030 return True, result_items, None 1031 1032 def _on_delete(self, request: DbRequest, key: KeyType, db_id: DbId = None) -> Tuple[bool, None, Optional[Exception]]: 1033 key = self.serializer.dumps(normalize_compound_key(key)) 1034 db_id = request._get_db_id(db_id) 1035 env_id = request.env_id 1036 1037 exception = None 1038 result = None 1039 try: 1040 if env_id not in self.deletion_cache: 1041 self.deletion_cache[env_id] = dict() 1042 1043 env_deletion_cache = self.deletion_cache[env_id] 1044 if db_id not in env_deletion_cache: 1045 env_deletion_cache[db_id] = set() 1046 1047 env_deletion_cache_db = env_deletion_cache[db_id] 1048 env_deletion_cache_db.add(key) 1049 except: 1050 exception = get_exception() 1051 1052 self.make_live() 1053 return True, result, exception 1054 1055 def _on_delete_kv(self, request: DbRequest, key: InputKeyType, value: ValueType, db_id: DbId = None) -> Tuple[bool, RawValueType, Optional[Exception]]: 1056 key = self.serializer.dumps(normalize_compound_key(key)) 1057 db_id = request._get_db_id(db_id) 1058 env_id = request.env_id 1059 1060 exception = None 1061 result = None 1062 try: 1063 if env_id not in self.kv_deletion_cache: 1064 self.kv_deletion_cache[env_id] = dict() 1065 1066 env_kv_deletion_cache = self.kv_deletion_cache[env_id] 1067 if db_id not in env_kv_deletion_cache: 1068 env_kv_deletion_cache[db_id] = dict() 1069 1070 env_kv_deletion_cache_db = env_kv_deletion_cache[db_id] 1071 result = env_kv_deletion_cache_db[key] = self.serializer.dumps(value) 1072 except: 1073 exception = get_exception() 1074 1075 self.make_live() 1076 return True, result, exception 1077 1078 def _on_delete_items(self, request: DbRequest, db_items: Set[InputKeyType], db_id: DbId = None) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Optional[Exception]]], Optional[Exception]]: 1079 result_items: Dict[DbId, Dict[RawKeyType, Optional[Exception]]] = dict() 1080 env_id = request.env_id 1081 1082 if env_id not in self.deletion_cache: 1083 self.deletion_cache[env_id] = dict() 1084 1085 env_deletion_cache = self.deletion_cache[env_id] 1086 if db_id not in result_items: 1087 result_items[db_id] = dict() 1088 1089 result_db_items = result_items[db_id] 1090 1091 if db_id not in env_deletion_cache: 1092 env_deletion_cache[db_id] = set() 1093 1094 env_deletion_cache_db = env_deletion_cache[db_id] 1095 for key in db_items: 1096 key = self.serializer.dumps(normalize_compound_key(key)) 1097 1098 exception = None 1099 try: 1100 env_deletion_cache_db.add(key) 1101 except: 1102 exception = get_exception() 1103 1104 result_db_items[key] = exception 1105 1106 self.make_live() 1107 return True, result_items, None 1108 1109 def _on_delete_kv_items(self, request: DbRequest, db_items: Dict[InputKeyType, Tuple[ValueType]], db_id: DbId = None) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]], Optional[Exception]]: 1110 result_items: Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]] = dict() 1111 env_id = request.env_id 1112 1113 if env_id not in self.kv_deletion_cache: 1114 self.kv_deletion_cache[env_id] = dict() 1115 1116 env_kv_deletion_cache = self.kv_deletion_cache[env_id] 1117 if db_id not in result_items: 1118 result_items[db_id] = dict() 1119 1120 result_db_items = result_items[db_id] 1121 1122 if db_id not in env_kv_deletion_cache: 1123 env_kv_deletion_cache[db_id] = dict() 1124 1125 env_kv_deletion_cache_db = env_kv_deletion_cache[db_id] 1126 for key, value in db_items.items(): 1127 exception = None 1128 result = None 1129 try: 1130 key = self.serializer.dumps(normalize_compound_key(key)) 1131 result = env_kv_deletion_cache_db[key] = self.serializer.dumps(value) 1132 except: 1133 exception = get_exception() 1134 1135 result_db_items[key] = (result, exception) 1136 1137 self.make_live() 1138 return True, result_items, None 1139 1140 def _on_open_databases(self, request: DbRequest, db_names: Dict[DbId, DbName]) -> ServiceProcessingResponse: 1141 exception = None 1142 try: 1143 env_info: EnvInfo = self.db_environments[request.env_id] 1144 except KeyError: 1145 exception = UnknownEnvError(request.env_id) 1146 1147 for db_id, db_name in db_names.items(): 1148 env_info.databases[db_id] = env_info.env.open_db(db_name) 1149 env_info.db_names[db_id] = db_name 1150 1151 env_info.env.sync(True) 1152 return True, None, exception 1153 1154 def _on_drop_db(self, request: DbRequest, db_id: DbId, delete: bool = False) -> ServiceProcessingResponse: 1155 coro_id = self.current_caller_coro_info.coro_id 1156 env_id = request.env_id 1157 1158 if env_id not in self.drop_db_requests: 1159 self.drop_db_requests[env_id] = dict() 1160 1161 drop_db_requests_env = self.drop_db_requests[env_id] 1162 1163 if db_id not in drop_db_requests_env: 1164 drop_db_requests_env[db_id] = [False, set()] 1165 1166 drop_db_requests_env_db = drop_db_requests_env[db_id] 1167 1168 if delete: 1169 drop_db_requests_env_db[0] = delete 1170 1171 drop_db_requests_env_db[1].add(coro_id) 1172 self.make_live() 1173 return False, None, None 1174 1175 def sync_in_thread_pool(self, env_id: EnvId = None): 1176 async def sync_db_coro(i: Interface, self: 'Db', env_id: EnvId, asyncio_loop, need_to_ensure_asyncio_loop: bool): 1177 if need_to_ensure_asyncio_loop: 1178 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None,CoroPriority.low, True)) 1179 else: 1180 if asyncio_loop is None: 1181 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest.get()) 1182 1183 async def sync_db(self: 'Db', asyncio_loop, env: lmdb.Environment): 1184 def sync_worker(): 1185 env.sync(True) 1186 self.write_locked.discard(env_id) 1187 1188 await task_in_thread_pool(asyncio_loop, sync_worker) 1189 1190 env: lmdb.Environment = self.db_environments[env_id].env 1191 await i(AsyncioLoop, AsyncioLoopRequest.wait(sync_db(self, asyncio_loop, env))) 1192 self.write_locked_coro_id.discard(i.coro_id) 1193 def make_service_live_for_a_next_sync(self: 'Db'): 1194 self.make_live() 1195 1196 await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self) 1197 1198 asyncio_loop = None 1199 need_to_ensure_asyncio_loop = False 1200 try: 1201 asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get() 1202 except AsyncioLoopWasNotSetError: 1203 need_to_ensure_asyncio_loop = True 1204 1205 coro: CoroWrapperBase = put_root_from_other_service(self, sync_db_coro, env_id, self, asyncio_loop, need_to_ensure_asyncio_loop) 1206 coro.is_background_coro = True 1207 self.write_locked.add(env_id) 1208 self.write_locked_coro_id.add(coro.coro_id)
Db( loop: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
301 def __init__(self, loop: CoroSchedulerType): 302 super(Db, self).__init__(loop) 303 self.default_db_id: DbName = b'__default__' 304 self.default_env_name: str = '__default__.dbenv' 305 # self.drop_db_requests: Dict[CoroID, Tuple[Hashable, bool]] = dict() 306 # self.drop_db_requests: Dict[EnvId, Dict[CoroID, Tuple[DbId, bool]]] = dict() 307 self.drop_db_requests: Dict[EnvId, Dict[DbId, List[bool, Set[CoroID]]]] = dict() 308 # self.read_queue: List[Tuple[CoroID, Tuple[RawKeyType, DbId]]] = list() 309 self.read_queue: Dict[EnvId, Dict[DbId, Dict[RawKeyType, Set[CoroID]]]] = dict() 310 self.massive_read_queue: Dict[EnvId, Dict[CoroID, Dict[DbId, Set[RawKeyType]]]] = dict() 311 # self.massive_read_queue: List[Tuple[CoroID, Set[Tuple[KeyType, DbId]]]] = list() 312 self.data_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, RawValueType]]] = dict() 313 # self.data_cache: Dict[EnvId, Dict[Tuple[RawKeyType, DbId], RawValueType]] = dict() 314 self.deletion_cache: Dict[EnvId, Dict[DbId, Set[RawKeyType]]] = dict() 315 self.kv_deletion_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, RawValueType]]] = dict() 316 # self.kv_deletion_cache: Dict[Tuple[Hashable, Hashable], Any] = dict() 317 self.get_first_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict() 318 self.get_last_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict() 319 self.get_n_items_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, int, bool]]] = dict() 320 self.get_items_range_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, RawKeyType, int, bool]]] = dict() 321 self.get_all_items_queue: List[Tuple[CoroID, DbId, EnvId]] = list() 322 self.path_to_default_db_environment: str = None 323 self.app_name_waiter: CoroWrapperBase = None 324 self.default_db_environment: lmdb.Environment = None 325 self.db_environments: Dict[EnvId, EnvInfo] = dict() 326 # self.databases: Dict[Hashable, Any] = dict() 327 # self.db_names: Dict[DbId, DbName] = dict() 328 self.async_loop = None 329 self.sync_time_interval = 1.0 330 self.last_sync_time = perf_counter() 331 self.force_sync: Set[EnvId] = set() 332 self.write_locked: Set[EnvId] = set() 333 self.writes_num: int = 0 334 self.reads_num: int = 0 335 self.deletes_num: int = 0 336 self.db_drops_num: int = 0 337 self.write_locked_coro_id: Set[CoroID] = set() 338 # self.serializer = best_serializer_for_standard_data((DataFormats.binary, 339 # Tags.can_use_bytes, 340 # Tags.decode_str_as_str, 341 # Tags.decode_list_as_list, 342 # Tags.decode_bytes_as_bytes, 343 # Tags.superficial, 344 # Tags.current_platform, 345 # Tags.multi_platform), 346 # TestDataType.small, 347 # 0.1) 348 self.serializer = Serializer(Serializers.msgspec_messagepack) 349 350 self._request_workers = { 351 0: self._on_set_default_db_environment_path, 352 1: self._on_open_databases, 353 2: self._on_drop_db, 354 3: self._on_sync, 355 4: self._on_get, 356 5: self._on_get_items, 357 6: self._on_get_all_items, 358 7: self._on_put, 359 8: self._on_put_items, 360 9: self._on_delete, 361 10: self._on_delete_kv, 362 11: self._on_delete_items, 363 12: self._on_delete_kv_items, 364 13: self._on_open_db_environment, 365 14: self._on_get_first, 366 15: self._on_get_last, 367 16: self._on_get_n_items, 368 17: self._on_get_items_range, 369 }
app_name_waiter: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroWrapperBase
db_environments: Dict[Hashable, cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.EnvInfo]
def
get_entity_stats( self, stats_level: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin.StatsLevel = <StatsLevel.debug: 1>) -> Tuple[str, Dict[str, Any]]:
372 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 373 return type(self).__name__, { 374 'db_names': list(self.db_names.keys()), 375 'writes num': self.writes_num, 376 'reads num': self.reads_num, 377 'deletes num': self.deletes_num, 378 'db drops num': self.db_drops_num, 379 }
def
norm_key( self, key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]]) -> Union[bytes, str, Tuple[Union[bytes, str]], tuple[Union[Tuple[Union[bytes, str]], Tuple[ForwardRef('NormalizedCompoundKeyType')]]]]:
def
raw_key( self, env_or_id: Union[Environment, Hashable], key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]]) -> bytes:
384 def raw_key(self, env_or_id: Union[lmdb.Environment, EnvId], key: InputKeyType) -> RawKeyType: 385 raw_key: bytes = self.serializer.dumps(normalize_compound_key(key)) 386 if isinstance(env_or_id, lmdb.Environment): 387 env = env_or_id 388 else: 389 env = self.db_environments[env_or_id].env 390 391 if len(raw_key) > env.max_key_size(): 392 raise RawKeyIsTooLargeError(f'Raw form ({raw_key=}) of the key ({key=}) is too large: {len(raw_key)} > {env.max_key_size()}')
def
single_task_registration_or_immediate_processing( self, *args, **kwargs) -> Tuple[bool, Any, Union[BaseException, NoneType]]:
def
full_processing_iteration(self):
402 def full_processing_iteration(self): 403 if self.default_db_environment is None: 404 if self.path_to_default_db_environment is None: 405 if self.app_name_waiter is None: 406 async def coro(i: Interface, self: 'Db'): 407 app_name_for_fs = await i(InstanceRequest().wait('app_name_for_fs')) 408 app_data_dir_path_type: AppDirectoryType = await i(InstanceRequest().wait('app_data_dir_path_type')) 409 app_dir_path: AppDirPath = await i(InstanceRequest().wait(AppDirPath)) 410 app_data_dir_path: str = app_dir_path.cached(app_data_dir_path_type, app_name_for_fs) 411 self.path_to_default_db_environment = RelativePath(app_data_dir_path)(self.default_env_name) 412 self.app_name_waiter = None 413 self.make_live() 414 415 self.app_name_waiter = put_root_from_other_service(self, coro, self) 416 self.app_name_waiter.is_background_coro = True 417 418 self.make_dead() 419 return 420 else: 421 self._init_default_db() 422 423 if self.force_sync: 424 envs_need_to_be_sync: Set[DbId] = self.force_sync 425 self.force_sync = set() 426 else: 427 envs_need_to_be_sync = set() 428 429 data_cache_buff = self.data_cache 430 self.data_cache = type(data_cache_buff)() 431 432 read_queue_buff = self.read_queue 433 self.read_queue = type(read_queue_buff)() 434 435 massive_read_queue_buff = self.massive_read_queue 436 self.massive_read_queue = type(massive_read_queue_buff)() 437 438 kv_deletion_cache_buff = self.kv_deletion_cache 439 self.kv_deletion_cache = type(kv_deletion_cache_buff)() 440 441 get_all_items_queue_buff = self.get_all_items_queue 442 self.get_all_items_queue = type(get_all_items_queue_buff)() 443 444 get_first_queue_buff = self.get_first_queue 445 self.get_first_queue = type(get_first_queue_buff)() 446 447 get_last_queue_buff = self.get_last_queue 448 self.get_last_queue = type(get_last_queue_buff)() 449 450 get_n_items_queue_buff = self.get_n_items_queue 451 self.get_n_items_queue = type(get_n_items_queue_buff)() 452 453 get_items_range_queue_buff = self.get_items_range_queue 454 self.get_items_range_queue = type(get_items_range_queue_buff)() 455 456 # put 457 def put_handler(env_info: EnvInfo, put_info: Dict[DbId, Dict[RawKeyType, RawValueType]]): 458 try: 459 with env_info.env.begin(write=True) as txn: 460 for db_id, db_put_info in put_info.items(): 461 if db_id in env_info.databases: 462 for raw_key, value in db_put_info.items(): 463 txn.put(raw_key, value, db=env_info.databases[db_id], dupdata=False, append=False) 464 465 self.writes_num += len(db_put_info) 466 except lmdb.MapFullError: 467 raise DBError.from_exception(db_id) 468 469 for env_id, put_info in data_cache_buff.items(): 470 if env_id in self.db_environments: 471 envs_need_to_be_sync.add(env_id) 472 lmdb_reapplier(self.db_environments[env_id], put_handler, put_info) 473 474 # TODO: implement delete* methods processing 475 # delete 476 for env_id, kv_deletion_cache_buff_db_info in kv_deletion_cache_buff.items(): 477 if env_id in self.db_environments: 478 envs_need_to_be_sync.add(env_id) 479 ... 480 481 for key_info, value in kv_deletion_cache_buff.items(): 482 with self.default_db_environment.begin(write=True) as txn: 483 key, db_id = key_info 484 txn.delete(key, value, db=self.databases[db_id]) 485 self.deletes_num += 1 486 487 # drop 488 drop_db_requests_buff = self.drop_db_requests 489 self.drop_db_requests = type(drop_db_requests_buff)() 490 491 def drop_handler(env_info: EnvInfo, drop_info: Dict[DbId, List[bool, Set[CoroID]]]): 492 for db_id, db_drop_info in drop_info.items(): 493 delete_db, coro_id = db_drop_info 494 if db_id in env_info.databases: 495 try: 496 with env_info.env.begin(write=True) as txn: 497 txn.drop(db=env_info.databases[db_id], delete=delete_db) 498 if delete_db: 499 del env_info.databases[db_id] 500 del env_info.db_names[db_id] 501 502 self.db_drops_num += 1 503 except lmdb.MapFullError: 504 raise DBError.from_exception(db_id) 505 506 self.register_response(coro_id, None, UnknownEnvError(env_id)) 507 else: 508 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 509 510 for env_id, drop_info in drop_db_requests_buff.items(): 511 if env_id in self.db_environments: 512 envs_need_to_be_sync.add(env_id) 513 lmdb_reapplier(self.db_environments[env_id], drop_handler, drop_info) 514 else: 515 for db_id, db_drop_info in drop_info.items(): 516 delete_db, coro_id = db_drop_info 517 self.register_response(coro_id, None, UnknownEnvError(env_id)) 518 519 # get 520 def get_item(txn, key_info: Tuple[RawKeyType, DbId, EnvId], data_cache_buff: Dict[EnvId, Dict[DbId, Dict[RawKeyType, RawValueType]]]) -> Tuple[ValueType, Optional[Exception]]: 521 key, db_id, env_id = key_info 522 need_to_get_from_db = True 523 try: 524 value = data_cache_buff[env_id][db_id][key] 525 need_to_get_from_db = False 526 except KeyError: 527 pass 528 529 if need_to_get_from_db: 530 value = txn.get(key, db=self.db_environments[env_id].databases[db_id]) 531 self.reads_num += 1 532 533 exception = None 534 try: 535 if value is None: 536 exception = DbKeyError(key_info) 537 else: 538 value = self.serializer.loads(value) 539 except: 540 exception = get_exception() 541 542 return value, exception 543 544 # _on_get 545 for env_id, read_queue_buff_db_info in read_queue_buff.items(): 546 if env_id in self.db_environments: 547 env_info = self.db_environments[env_id] 548 for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items(): 549 if db_id in env_info.databases: 550 with env_info.env.begin() as txn: 551 for raw_key, coro_ids in read_queue_buff_db_key_info.items(): 552 value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff) 553 for coro_id in coro_ids: 554 self.register_response(coro_id, value, exception) 555 else: 556 for raw_key, coro_ids in read_queue_buff_db_key_info.items(): 557 for coro_id in coro_ids: 558 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 559 else: 560 for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items(): 561 for raw_key, coro_ids in read_queue_buff_db_key_info.items(): 562 for coro_id in coro_ids: 563 self.register_response(coro_id, None, UnknownEnvError(env_id)) 564 565 # _on_get_items 566 results: Dict[CoroID, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]]] = dict() 567 for env_id, massive_read_queue_buff_coro_info in massive_read_queue_buff.items(): 568 if env_id in self.db_environments: 569 env_info = self.db_environments[env_id] 570 for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items(): 571 if coro_id not in results: 572 results[coro_id] = dict() 573 574 coro_results = results[coro_id] 575 for db_id, raw_keys in read_queue_buff_db_info.items(): 576 if db_id not in coro_results: 577 coro_results[db_id] = dict() 578 579 coro_db_results = coro_results[db_id] 580 if db_id in env_info.databases: 581 with env_info.env.begin() as txn: 582 for raw_key in raw_keys: 583 value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff) 584 coro_db_results[normalize_compound_key(raw_key)] = (value, exception) 585 else: 586 for coro_id in coro_ids: 587 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 588 else: 589 for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items(): 590 for db_id, raw_keys in read_queue_buff_db_info.items(): 591 for coro_id in coro_ids: 592 self.register_response(coro_id, None, UnknownEnvError(env_id)) 593 594 for coro_id, coro_results in results.items(): 595 self.register_response(coro_id, coro_results, None) 596 597 # get all items 598 for coro_id, db_id, env_id in get_all_items_queue_buff: 599 if env_id in self.db_environments: 600 env_info = self.db_environments[env_id] 601 env = env_info.env 602 if db_id in env_info.databases: 603 db = env_info.databases[db_id] 604 with env.begin(db=db) as txn: 605 result = list() 606 exception = None 607 try: 608 result = [(normalize_compound_key(self.serializer.loads(k)), self.serializer.loads(v)) for k, v in txn.cursor()] 609 self.reads_num += len(result) 610 except: 611 exception = get_exception() 612 613 self.register_response(coro_id, result, exception) 614 else: 615 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 616 else: 617 self.register_response(coro_id, None, UnknownEnvError(env_id)) 618 619 # get_first 620 for env_id, get_first_queue_buff_db_info in get_first_queue_buff.items(): 621 if env_id in self.db_environments: 622 env_info = self.db_environments[env_id] 623 for db_id, coro_ids in get_first_queue_buff_db_info.items(): 624 if db_id in env_info.databases: 625 db = env_info.databases[db_id] 626 with env_info.env.begin(db=db) as txn: 627 result = None 628 exception = None 629 try: 630 cursor: lmdb.Cursor = txn.cursor() 631 if cursor.first(): 632 result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value())) 633 self.reads_num += 1 634 else: 635 exception = KeyError() 636 except: 637 exception = get_exception() 638 639 for coro_id in coro_ids: 640 self.register_response(coro_id, result, exception) 641 else: 642 for coro_id in coro_ids: 643 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 644 else: 645 for db_id, coro_ids in get_first_queue_buff_db_info.items(): 646 for coro_id in coro_ids: 647 self.register_response(coro_id, None, UnknownEnvError(env_id)) 648 649 # get_last 650 for env_id, get_last_queue_buff_db_info in get_last_queue_buff.items(): 651 if env_id in self.db_environments: 652 env_info = self.db_environments[env_id] 653 for db_id, coro_ids in get_last_queue_buff_db_info.items(): 654 if db_id in env_info.databases: 655 db = env_info.databases[db_id] 656 with env_info.env.begin(db=db) as txn: 657 result = None 658 exception = None 659 try: 660 cursor: lmdb.Cursor = txn.cursor() 661 if cursor.first(): 662 result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value())) 663 self.reads_num += 1 664 else: 665 exception = KeyError() 666 except: 667 exception = get_exception() 668 669 for coro_id in coro_ids: 670 self.register_response(coro_id, result, exception) 671 else: 672 for coro_id in coro_ids: 673 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 674 else: 675 for db_id, coro_ids in get_last_queue_buff_db_info.items(): 676 for coro_id in coro_ids: 677 self.register_response(coro_id, None, UnknownEnvError(env_id)) 678 679 # get_n_items 680 for env_id, get_n_items_queue_buff_coro_info in get_n_items_queue_buff.items(): 681 if env_id in self.db_environments: 682 env_info = self.db_environments[env_id] 683 for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items(): 684 coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list() 685 db_id, first_desired_raw_key, num_items, reverse = read_queue_buff_db_info 686 if db_id in env_info.databases: 687 db = env_info.databases[db_id] 688 with env_info.env.begin(db=db) as txn: 689 result = None 690 exception = None 691 try: 692 cursor: lmdb.Cursor = txn.cursor() 693 if cursor.set_range(first_desired_raw_key): 694 if reverse: 695 cursor_iterator = cursor.iterprev(keys=True, values=True) 696 else: 697 cursor_iterator = cursor.iternext(keys=True, values=True) 698 699 for raw_key, raw_value in cursor_iterator: 700 if (num_items is not None) and (num_items <= 0): 701 break 702 703 coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value))) 704 self.reads_num += 1 705 if num_items is not None: 706 num_items -= 1 707 else: 708 exception = KeyError() 709 except: 710 exception = get_exception() 711 712 self.register_response(coro_id, result, exception) 713 else: 714 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 715 else: 716 for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items(): 717 for coro_id in coro_ids: 718 self.register_response(coro_id, None, UnknownEnvError(env_id)) 719 720 # get_items_range 721 for env_id, get_items_range_queue_buff_coro_info in get_items_range_queue_buff.items(): 722 if env_id in self.db_environments: 723 env_info = self.db_environments[env_id] 724 for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items(): 725 coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list() 726 db_id, first_desired_raw_key, last_desired_raw_key, num_items, reverse = read_queue_buff_db_info 727 if db_id in env_info.databases: 728 db = env_info.databases[db_id] 729 with env_info.env.begin(db=db) as txn: 730 result = None 731 exception = None 732 try: 733 cursor: lmdb.Cursor = txn.cursor() 734 if cursor.set_range(first_desired_raw_key): 735 if reverse: 736 cursor_iterator = cursor.iterprev(keys=True, values=True) 737 else: 738 cursor_iterator = cursor.iternext(keys=True, values=True) 739 740 for raw_key, raw_value in cursor_iterator: 741 if reverse: 742 if raw_key < last_desired_raw_key: 743 break 744 else: 745 if raw_key > last_desired_raw_key: 746 break 747 748 if (num_items is not None) and (num_items <= 0): 749 break 750 751 coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value))) 752 self.reads_num += 1 753 if num_items is not None: 754 num_items -= 1 755 else: 756 exception = KeyError() 757 except: 758 exception = get_exception() 759 760 self.register_response(coro_id, result, exception) 761 else: 762 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 763 else: 764 for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items(): 765 for coro_id in coro_ids: 766 self.register_response(coro_id, None, UnknownEnvError(env_id)) 767 768 # sync 769 if envs_need_to_be_sync: 770 for env_id in envs_need_to_be_sync: 771 self.sync_in_thread_pool(env_id) 772 773 self.last_sync_time = perf_counter() 774 775 self.make_dead()
def
in_work(self) -> bool:
777 def in_work(self) -> bool: 778 result: bool = (bool(self.get_first_queue) or bool(self.get_last_queue) or bool(self.get_n_items_queue) or bool(self.get_items_range_queue) or bool(self.read_queue) or bool(self.massive_read_queue) or bool(self.get_all_items_queue)) or (bool(self.force_sync) or ((not bool(self.write_locked)) and (bool(self.data_cache) or bool(self.kv_deletion_cache) or bool(self.drop_db_requests)) and ((perf_counter() - self.last_sync_time) >= self.sync_time_interval))) 779 return self.thrifty_in_work(result)
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
def
time_left_before_next_event(self) -> Tuple[bool, Union[int, float, NoneType]]:
781 def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]: 782 time_since_last_sync_time: float = perf_counter() - self.last_sync_time 783 if self.sync_time_interval > time_since_last_sync_time: 784 return True, self.sync_time_interval - time_since_last_sync_time 785 else: 786 return True, 0
def
sync_in_thread_pool(self, env_id: Hashable = None):
1175 def sync_in_thread_pool(self, env_id: EnvId = None): 1176 async def sync_db_coro(i: Interface, self: 'Db', env_id: EnvId, asyncio_loop, need_to_ensure_asyncio_loop: bool): 1177 if need_to_ensure_asyncio_loop: 1178 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None,CoroPriority.low, True)) 1179 else: 1180 if asyncio_loop is None: 1181 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest.get()) 1182 1183 async def sync_db(self: 'Db', asyncio_loop, env: lmdb.Environment): 1184 def sync_worker(): 1185 env.sync(True) 1186 self.write_locked.discard(env_id) 1187 1188 await task_in_thread_pool(asyncio_loop, sync_worker) 1189 1190 env: lmdb.Environment = self.db_environments[env_id].env 1191 await i(AsyncioLoop, AsyncioLoopRequest.wait(sync_db(self, asyncio_loop, env))) 1192 self.write_locked_coro_id.discard(i.coro_id) 1193 def make_service_live_for_a_next_sync(self: 'Db'): 1194 self.make_live() 1195 1196 await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self) 1197 1198 asyncio_loop = None 1199 need_to_ensure_asyncio_loop = False 1200 try: 1201 asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get() 1202 except AsyncioLoopWasNotSetError: 1203 need_to_ensure_asyncio_loop = True 1204 1205 coro: CoroWrapperBase = put_root_from_other_service(self, sync_db_coro, env_id, self, asyncio_loop, need_to_ensure_asyncio_loop) 1206 coro.is_background_coro = True 1207 self.write_locked.add(env_id) 1208 self.write_locked_coro_id.add(coro.coro_id)
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
- destroy
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
- StatsLevel
class
DbRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
208class DbRequest(ServiceRequest): 209 def __init__(self, env_id: EnvId = None, db_id: DbId = None): 210 super().__init__() 211 self.env_id: EnvId = env_id 212 self.db_id: DbId = db_id 213 self.provide_to_request_handler = True 214 215 def _copy(self) -> 'DbRequest': 216 return DbRequest(self.env_id, self.db_id) 217 218 def _get_db_id(self, db_id: DbId) -> DbId: 219 if self.db_id is None: 220 return db_id 221 else: 222 return self.db_id 223 224 def set_default_db_environment_path(self, path_to_db_environment: str) -> bool: 225 return self._save_to_copy(0, path_to_db_environment) 226 227 def open_databases(self, db_names: Dict[DbId, DbName]) -> None: 228 return self._save_to_copy(1, db_names) 229 230 def drop_db(self, db_id: DbId, delete: bool = False) -> None: 231 return self._save_to_copy(2, db_id, delete) 232 233 def sync(self) -> None: 234 return self._save_to_copy(3) 235 236 def get(self, key: InputKeyType, db_id: DbId = None) -> ValueType: 237 return self._save_to_copy(4, key, db_id) 238 239 def get_first(self, db_id: DbId = None) -> Dict[NormalizedKeyType, ValueType]: 240 # Returns first item in DB 241 return self._save_to_copy(14, db_id) 242 243 def get_last(self, db_id: DbId = None) -> Dict[NormalizedKeyType, ValueType]: 244 # Returns last item in DB 245 return self._save_to_copy(15, db_id) 246 247 def get_items(self, db_keys: Sequence[InputKeyType], db_id: DbId = None) -> List[Tuple[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]]: 248 return self._save_to_copy(5, db_keys, db_id) 249 250 def get_n_items(self, desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]: 251 return self._save_to_copy(16, desired_key, num, db_id, reverse=False) 252 253 def get_reverse_n_items(self, desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]: 254 return self._save_to_copy(16, desired_key, num, db_id, reverse=True) 255 256 def get_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]: 257 return self._save_to_copy(17, first_desired_key, last_desired_key, num, db_id, reverse=False) 258 259 def get_reverse_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]: 260 return self._save_to_copy(17, first_desired_key, last_desired_key, num, db_id, reverse=True) 261 262 def get_all_items(self, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]: 263 # Returns all items from DB 264 return self._save_to_copy(6, db_id) 265 266 def put(self, key: InputKeyType, value: Optional[ValueType] = None, db_id: DbId = None) -> RawValueType: 267 return self._save_to_copy(7, key, value, db_id) 268 269 def put_items(self, db_items: Dict[InputKeyType, ValueType], db_id: DbId = None) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]: 270 return self._save_to_copy(8, db_items, db_id) 271 272 def delete(self, key: InputKeyType, db_id: DbId = None) -> RawValueType: 273 return self._save_to_copy(9, key, db_id) 274 275 def delete_kv(self, key: InputKeyType, value: ValueType, db_id: DbId = None) -> RawValueType: 276 return self._save_to_copy(10, key, value, db_id) 277 278 def delete_items(self, db_items: Set[InputKeyType], db_id: DbId = None) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]: 279 return self._save_to_copy(11, db_items, db_id) 280 281 def delete_kv_items(self, db_items: Dict[InputKeyType, Tuple[ValueType]], db_id: DbId = None) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]: 282 return self._save_to_copy(12, db_items, db_id) 283 284 def open_db_environment(self, path_to_db_environment: str) -> EnvId: 285 return self._save_to_copy(13, path_to_db_environment) 286 287 def lock_databases(self, db_names: Optional[Set[DbId]] = None) -> None: 288 # Lock all databases if db_names is None. Databases will be released automatically wnen coroutine execution will be finished 289 return self._save_to_copy(18, db_names) 290 291 def try_lock_databases(self, db_names: Optional[Set[DbId]] = None) -> bool: 292 # Tries to lock all databases if db_names is None. Returns True if try was successfull. False otherwise. Databases will be released automatically wnen coroutine execution will be finished 293 return self._save_to_copy(19, db_names) 294 295 def unlock_databases(self, db_names: Optional[Set[DbId]] = None) -> None: 296 # Unlock all databases if db_names is None 297 return self._save_to_copy(18, db_names)
def
get( self, key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], db_id: Hashable = None) -> Any:
def
get_first( self, db_id: Hashable = None) -> dict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Any]:
def
get_last( self, db_id: Hashable = None) -> dict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Any]:
def
get_items( self, db_keys: collections.abc.Sequence[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]], db_id: Hashable = None) -> list[tuple[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Any, typing.Union[Exception, NoneType]]]]:
def
get_n_items( self, desired_key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], num: Union[int, NoneType] = None, db_id: Hashable = None) -> list[tuple[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Any]]:
def
get_reverse_n_items( self, desired_key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], num: Union[int, NoneType] = None, db_id: Hashable = None) -> list[tuple[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Any]]:
def
get_items_range( self, first_desired_key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], last_desired_key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], num: Union[int, NoneType] = None, db_id: Hashable = None) -> list[tuple[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Any]]:
def
get_reverse_items_range( self, first_desired_key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], last_desired_key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], num: Union[int, NoneType] = None, db_id: Hashable = None) -> list[tuple[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Any]]:
def
get_all_items( self, db_id: Hashable = None) -> list[tuple[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Any]]:
def
put( self, key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], value: Union[Any, NoneType] = None, db_id: Hashable = None) -> bytes:
def
put_items( self, db_items: dict[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Any], db_id: Hashable = None) -> Dict[Hashable, Dict[bytes, Tuple[bytes, Union[Exception, NoneType]]]]:
def
delete( self, key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], db_id: Hashable = None) -> bytes:
def
delete_kv( self, key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], value: Any, db_id: Hashable = None) -> bytes:
def
delete_items( self, db_items: set[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]], db_id: Hashable = None) -> Dict[Hashable, Dict[bytes, Tuple[bytes, Union[Exception, NoneType]]]]:
def
delete_kv_items( self, db_items: dict[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Tuple[typing.Any]], db_id: Hashable = None) -> Dict[Hashable, Dict[bytes, Tuple[bytes, Union[Exception, NoneType]]]]:
def
try_lock_databases(self, db_names: Union[Set[Hashable], NoneType] = None) -> bool:
291 def try_lock_databases(self, db_names: Optional[Set[DbId]] = None) -> bool: 292 # Tries to lock all databases if db_names is None. Returns True if try was successfull. False otherwise. Databases will be released automatically wnen coroutine execution will be finished 293 return self._save_to_copy(19, db_names)
default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] =
<class 'Db'>
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
- default__request__type__
- request_type
- args
- kwargs
- interface
- i
- async_interface
- ai
KeyType =
typing.Union[bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]
RawKeyType =
<class 'bytes'>
ValueType =
typing.Any
RawValueType =
<class 'bytes'>
DbId =
typing.Hashable
DbName =
<class 'bytes'>
class
DbKeyError(builtins.KeyError):
123class DbKeyError(KeyError): 124 def __init__(self, key_info: Tuple[KeyType, DbId], *args: object) -> None: 125 super().__init__(*args) 126 self.key_info: Tuple[KeyType, DbId] = key_info
Mapping key not found.
DbKeyError( key_info: tuple[typing.Union[bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Hashable], *args: object)
key_info: tuple[typing.Union[bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Hashable]
Inherited Members
- builtins.BaseException
- with_traceback
- args