cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19__all__ = ['default_env_path_and_params', 'Db', 'DbRequest', 'KeyType', 'RawKeyType', 'ValueType', 'RawValueType', 20 'DbId', 'EnvId', 'DbName', 'DbKeyError', 'EnvInfo'] 21 22from cengal.parallel_execution.coroutines.coro_scheduler import * 23from cengal.parallel_execution.coroutines.coro_tools.await_coro import * 24from cengal.parallel_execution.coroutines.coro_standard_services.asyncio_loop import * 25from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import CoroPriority 26from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import * 27from cengal.parallel_execution.coroutines.coro_standard_services.timer_func_runner import * 28from cengal.parallel_execution.coroutines.coro_standard_services.instance import * 29from cengal.file_system.file_manager import path_relative_to_current_dir 30from cengal.time_management.cpu_clock_cycles import perf_counter 31from cengal.data_manipulation.serialization import * 32from cengal.introspection.inspect import get_exception 33from cengal.file_system.app_fs_structure.app_dir_path import AppDirPath, AppDirectoryType 34from cengal.file_system.path_manager import RelativePath 35from cengal.code_flow_control.args_manager import ArgsKwargs 36from cengal.text_processing.text_processing import to_identifier 37from cengal.math.numbers import RationalNumber 38from typing import Hashable, Tuple, List, Any, Dict, Callable, Sequence, NamedTuple, OrderedDict as OrderedDictType 39import sys 40import os 41import asyncio 42try: 43 import lmdb 44except ImportError: 45 from warnings import warn 46 warn('''WARNING: `lmdb` library is not installed. Db service will not work. 47 To install `lmdb` use: `pip install lmdb`''') 48 raise 49 50from os.path import normpath 51from uuid import uuid4 52from collections import OrderedDict, namedtuple 53from functools import update_wrapper 54import inspect 55 56 57""" 58Module Docstring 59Docstrings: http://www.python.org/dev/peps/pep-0257/ 60""" 61 62__author__ = "ButenkoMS <gtalk@butenkoms.space>" 63__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 64__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 65__license__ = "Apache License, Version 2.0" 66__version__ = "4.4.1" 67__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 68__email__ = "gtalk@butenkoms.space" 69# __status__ = "Prototype" 70__status__ = "Development" 71# __status__ = "Production" 72 73 74SingleKeyType = Union[bytes, str] 75CompoundKeyType = Union[List[SingleKeyType], Set[SingleKeyType], Tuple[SingleKeyType], List['CompoundKeyType'], Set['CompoundKeyType'], Tuple['CompoundKeyType']] 76KeyType = Union[SingleKeyType, CompoundKeyType] 77NormalizedCompoundKeyType = Union[Tuple[SingleKeyType], Tuple['NormalizedCompoundKeyType']] 78NormalizedKeyType = Union[SingleKeyType, NormalizedCompoundKeyType] 79RawKeyType = bytes # By default record keys are limited to 511 bytes in length, however this can be adjusted by rebuilding the library. The compile-time key length can be queried via Environment.max_key_size() 80ValueType = Any 81RawValueType = bytes 82DbId = Hashable 83DbName = bytes 84EnvId = Hashable 85 86 87class KeyInfo: 88 def __init__(self, key: KeyType, db_id: DbId = None, env_id: EnvId = None): 89 self.key: KeyType = key 90 self.db_id: DbId = db_id 91 self.env_id: EnvId = env_id 92 93 94ValueInfo = NamedTuple('ValueInfo', [('value', ValueType), ('dupdata', bool), ('overwrite', bool), ('append', bool)]) 95VI = ValueInfo 96RawValueInfo = NamedTuple('RawValueInfo', [('value', RawValueType), ('dupdata', bool), ('overwrite', bool), ('append', bool)]) 97RVI = RawValueInfo 98 99 100default_env_path_and_params: ArgsKwargs = ArgsKwargs( 101 env_path=None, 102 can_be_written_externally=False, 103 map_size=20 * 1024**2, 104 writemap=True, 105 max_dbs=10, 106 map_async=True, 107 lock=False, 108 metasync=False, 109 sync=False, 110 meminit=False, 111) 112 113 114default_env_params: ArgsKwargs = ArgsKwargs( 115 can_be_written_externally=False, 116 map_size=20 * 1024**2, 117 writemap=True, 118 max_dbs=10, 119 map_async=True, 120 lock=False, 121 metasync=False, 122 sync=False, 123 meminit=False, 124) 125 126 127class EnvInitInfo: 128 def __init__(self, env_id: EnvId, env_path: str, can_be_written_externally: bool, *args, **kwargs): 129 self.env_id: EnvId = env_id 130 self.env_path: str = normpath(env_path) 131 self.can_be_written_externally: bool = can_be_written_externally 132 self.args: Tuple = (self.env_path,) + args 133 self.kwargs: Dict = kwargs 134 135 136class EnvInfo: 137 db_name_prefix = '__db_name_key_16318cf6_3e16_4881_a22f_328aa41c0d4f__' 138 db_name_prefix_bytes = b'__db_name_key_16318cf6_3e16_4881_a22f_328aa41c0d4f__' 139 140 def __init__(self, init_info: EnvInitInfo, env: Optional[lmdb.Environment] = None): 141 self.init_info: EnvInitInfo = init_info 142 if not os.path.exists(init_info.env_path) or (os.path.exists(init_info.env_path) and not os.path.isdir(init_info.env_path)): 143 os.makedirs(init_info.env_path) 144 145 self.env: lmdb.Environment = lmdb.Environment(*init_info.args, **init_info.kwargs) if env is None else env 146 self.env_id: EnvId = init_info.env_id 147 self.databases: Dict[DbId, lmdb._Database] = dict() 148 self.db_names: Dict[DbId, DbName] = dict() 149 self.prepare_main_db() 150 151 @staticmethod 152 def gen_db_name_from_db_id(db_id: DbId) -> Union[None, bytes]: 153 if db_id is None: 154 return None 155 156 return f'{EnvInfo.db_name_prefix}{db_id}'.encode('utf-8') 157 158 def db_name_by_db_id(self, db_id: DbId) -> Union[None, bytes]: 159 try: 160 return self.db_names[db_id] 161 except KeyError: 162 raise UnknownEnvDBError(self.env_id, db_id) 163 164 def db_by_db_id(self, db_id: DbId) -> lmdb._Database: 165 try: 166 return self.databases[db_id] 167 except KeyError: 168 raise UnknownEnvDBError(self.env_id, db_id) 169 170 def open_db(self, db_id: DbId, *args, **kwargs) -> lmdb._Database: 171 db_name: Union[None, bytes] = self.gen_db_name_from_db_id(db_id) 172 new_db: lmdb._Database = self.env.open_db(db_name, *args, **kwargs) 173 self.databases[db_id] = new_db 174 self.db_names[db_id] = db_name 175 return new_db 176 177 def prepare_main_db(self) -> lmdb._Database: 178 self.open_db(None) 179 180 def close(self): 181 self.env.close() 182 183 184class DbKeyError(KeyError): 185 def __init__(self, key_info: Tuple[KeyType, DbId], *args: object) -> None: 186 super().__init__(*args) 187 self.key_info: Tuple[KeyType, DbId] = key_info 188 189 190class DBError(Exception): 191 def __init__(self, db_id: DbId, original_exception: Exception, *args): 192 super().__init__(*args) 193 self.db_id: DbId = db_id 194 self.original_exception = original_exception 195 196 @staticmethod 197 def from_exception(db_id: DbId) -> 'DBError': 198 return DBError(db_id, get_exception()) 199 200 201class UnknownEnvError(Exception): 202 pass 203 204 205class UnknownEnvDBError(Exception): 206 pass 207 208 209class WrongKeyTypeError(TypeError): 210 pass 211 212 213class RawKeyIsTooLargeError(ValueError): 214 pass 215 216 217class DefaultDbEnvironmentCanNotBeClosedManualyError(Exception): 218 pass 219 220 221class NormalizedCompoundKey: 222 def __init__(self, normalized_key: NormalizedKeyType) -> None: 223 self.normalized_key: NormalizedKeyType = normalized_key 224 225 def __call__(self) -> Any: 226 return self.normalized_key 227 228 @staticmethod 229 def from_key(key: KeyType): 230 return NormalizedCompoundKey(normalize_compound_key(key)) 231 232 233NCK = NormalizedCompoundKey 234InputKeyType = Union[NormalizedCompoundKey, KeyType] 235 236 237def is_normalized_compound_key(key: KeyType) -> bool: 238 if isinstance(key, (bytes, str, int, float)): 239 return True 240 elif isinstance(key, tuple): 241 return all(is_normalized_compound_key(item) for item in key) 242 else: 243 return False 244 245 246def normalize_compound_key(key: KeyType) -> NormalizedKeyType: 247 if isinstance(key, NormalizedCompoundKey): 248 return key() 249 elif is_normalized_compound_key(key): 250 return key 251 elif isinstance(key, list): 252 need_to_sort: bool = False 253 elif isinstance(key, (set, frozenset)): 254 need_to_sort = True 255 elif isinstance(key, dict): 256 key = tuple(key.items()) 257 need_to_sort = True 258 else: 259 raise WrongKeyTypeError(f'Wrong key type: {type(key)}: {key}') 260 261 new_key = list() 262 for item in key: 263 new_key.append(normalize_compound_key(item)) 264 265 if need_to_sort: 266 new_key.sort() 267 268 key = tuple(new_key) 269 270 return key 271 272 273class DbRequest(ServiceRequest): 274 def __init__(self, env_id: EnvId = None, db_id: DbId = None, needs_sync: bool = False, can_wait: bool = False): 275 super().__init__() 276 self.env_id: EnvId = env_id 277 self.db_id: DbId = db_id 278 self.needs_sync: bool = needs_sync 279 self.provide_to_request_handler = True 280 self.can_wait: bool = can_wait # TODO: implement. If True then request can wait for a next iteration in attempt to create a bunch of requests. If False then request will be processed immediately. 281 282 def _copy(self) -> 'DbRequest': 283 return DbRequest(self.env_id, self.db_id, self.needs_sync) 284 285 def set_root_path_to_db_environments(self, root_path_to_db_environments: str) -> bool: 286 return self._save_to_copy(0, root_path_to_db_environments) 287 288 def open_databases(self, db_ids: Set[DbId], *args, **kwargs) -> None: 289 return self._save_to_copy(1, db_ids, *args, **kwargs) 290 291 def drop_db(self, db_id: DbId, delete: bool = False) -> None: 292 return self._save_to_copy(2, db_id, delete) 293 294 def sync(self) -> None: 295 return self._save_to_copy(3) 296 297 def wait_sync(self) -> None: # TODO: implement 298 return self._save_to_copy(22) 299 300 def set_sync_timeout(self, timeout: RationalNumber) -> None: # TODO: implement 301 return self._save_to_copy(23, timeout) 302 303 def get(self, key: InputKeyType) -> ValueType: 304 return self._save_to_copy(4, key) 305 306 def get_first(self) -> Tuple[NormalizedKeyType, ValueType]: 307 # Returns first item in DB 308 return self._save_to_copy(14) 309 310 def get_last(self) -> Tuple[NormalizedKeyType, ValueType]: 311 # Returns last item in DB 312 return self._save_to_copy(15) 313 314 def get_items(self, db_keys: Sequence[InputKeyType]) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]: 315 return self._save_to_copy(5, db_keys) 316 317 def get_n_items(self, desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]: 318 return self._save_to_copy(16, desired_key, num, reverse=False) 319 320 def get_reverse_n_items(self, desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]: 321 return self._save_to_copy(16, desired_key, num, reverse=True) 322 323 def get_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]: 324 return self._save_to_copy(17, first_desired_key, last_desired_key, num, reverse=False) 325 326 def get_reverse_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]: 327 return self._save_to_copy(17, first_desired_key, last_desired_key, num, reverse=True) 328 329 def get_all_items(self) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]: 330 # Returns all items from DB 331 return self._save_to_copy(6) 332 333 def pop(self, key: InputKeyType) -> ValueType: # TODO: implement 334 return self._save_to_copy(24, key) 335 336 def pop_items(self, db_keys: Sequence[InputKeyType]) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]: # TODO: implement 337 return self._save_to_copy(25, db_keys) 338 339 def put(self, key: InputKeyType, value: Optional[Union[ValueType, ValueInfo]] = None) -> RawValueType: 340 return self._save_to_copy(7, key, value) 341 342 def put_items(self, db_items: Dict[InputKeyType, Union[ValueType, ValueInfo]]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]: 343 return self._save_to_copy(8, db_items) 344 345 def increment(self, key: InputKeyType, value: RationalNumber = 1) -> RationalNumber: # TODO: implement 346 return self._save_to_copy(19, key, value) 347 348 inc = increment 349 350 def increment_items(self, db_items: Dict[InputKeyType, Union[RationalNumber, ValueInfo]]) -> OrderedDictType[NormalizedKeyType, Tuple[RationalNumber, Optional[Exception]]]: # TODO: implement 351 return self._save_to_copy(8, db_items) 352 353 inc_items = increment_items 354 355 def replace(self, key: InputKeyType, value: Optional[Union[ValueType, ValueInfo]] = None) -> Tuple[RawValueType, ValueType]: # TODO: implement 356 return self._save_to_copy(20, key, value) 357 358 def replace_items(self, db_items: Dict[InputKeyType, Union[ValueType, ValueInfo]]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, ValueType, Optional[Exception]]]]: # TODO: implement 359 return self._save_to_copy(21, db_items) 360 361 def delete(self, key: InputKeyType) -> RawValueType: # TODO: finish an implementation 362 return self._save_to_copy(9, key) 363 364 def delete_kv(self, key: InputKeyType, value: ValueType) -> RawValueType: # TODO: finish an implementation 365 return self._save_to_copy(10, key, value) 366 367 def delete_items(self, db_items: Set[InputKeyType]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]: # TODO: finish an implementation 368 return self._save_to_copy(11, db_items) 369 370 def delete_kv_items(self, db_items: Dict[InputKeyType, Tuple[ValueType]]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]: # TODO: finish an implementation 371 return self._save_to_copy(12, db_items) 372 373 def execute_in_transaction(self, env_id: EnvId, callable_or_coro: Union[Callable, AnyWorker]) -> None: # TODO: implement 374 # Will execute given callable or coroutine in transaction (after all other queues will be processed) 375 # It will run in current thread. So, it will block current thread until it will be finished if it is not a coroutine. 376 # On the other hand it will lock environment and all databases in it until coroutine will be finished. 377 return self._save_to_copy(29, env_id, callable_or_coro) 378 379 def open_db_environment(self, env_id: EnvId, env_path: Union[None, str], can_be_written_externally: bool, *args, **kwargs) -> EnvId: 380 return self._save_to_copy(13, env_id, env_path, can_be_written_externally, *args, **kwargs) 381 382 def close_db_environment(self, env_id: EnvId) -> None: 383 return self._save_to_copy(18, env_id) 384 385 def lock_databases(self, db_names: Optional[Set[DbId]] = None) -> None: # TODO: implement 386 # Lock all databases if db_names is None. Databases will be released automatically wnen coroutine execution will be finished 387 return self._save_to_copy(26, db_names) 388 389 def try_lock_databases(self, db_names: Optional[Set[DbId]] = None) -> bool: # TODO: implement 390 # Tries to lock all databases if db_names is None. Returns True if try was successfull. False otherwise. Databases will be released automatically wnen coroutine execution will be finished 391 return self._save_to_copy(27, db_names) 392 393 def unlock_databases(self, db_names: Optional[Set[DbId]] = None) -> None: # TODO: implement 394 # Unlock all databases if db_names is None 395 return self._save_to_copy(28, db_names) 396 397 398# class TransactionContextManager(DbRequest): 399# def __init__(self, *args, **kwargs): 400# super().__init__(*args, **kwargs) 401# self._journal_db_id: DbName = b'__journal__' 402# self._transaction: Optional[lmdb.Transaction] = None 403# self._transaction_context_manager: Optional[lmdb.Transaction] = None 404# self._transaction_history: List[DbRequest] = list() 405# self._committed: bool = False 406 407# @staticmethod 408# def from_request(request: DbRequest) -> 'TransactionContextManager': 409# return TransactionContextManager(request.env_id, request.db_id) 410 411# def to_request(self) -> DbRequest: 412# return DbRequest(self.env_id, self.db_id) 413 414# @staticmethod 415# def request_to_data(request: DbRequest) -> Dict: 416# result = dict() 417# result['env_id'] = request.env_id 418# result['db_id'] = request.db_id 419# result['needs_sync'] = request.needs_sync 420# result['request_type'] = request.request_type 421# result['args'] = request.args 422# result['kwargs'] = request.kwargs 423# result['provide_to_request_handler'] = request.provide_to_request_handler 424# return result 425 426# @staticmethod 427# def request_from_data(data: Dict) -> DbRequest: 428# request = DbRequest(data['env_id'], data['db_id'], data['needs_sync']) 429# request.request_type = data['request_type'] 430# request.args = data['args'] 431# request.kwargs = data['kwargs'] 432# request.provide_to_request_handler = data['provide_to_request_handler'] 433# return request 434 435# async def _commit(self): 436# i: Interface = current_interface() 437# try: 438# await i(DbRequest(self.env_id, self._journal_db_id, True).lock_databases({self._journal_db_id})) 439# last_committed_id: int = await i(DbRequest(self.env_id, self._journal_db_id, True).get(1)) 440# last_id: int = await i(DbRequest(self.env_id, self._journal_db_id, True).get(0)) 441# our_id = last_id 442# for request in self._transaction_history: 443# our_id += 1 444# await i(DbRequest(self.env_id, self._journal_db_id, True).put(our_id, TransactionContextManager.request_to_data(request))) 445 446# last_id: int = await i(DbRequest(self.env_id, self._journal_db_id, True).replace(0, our_id)) 447# for request in self._transaction_history: 448# await i(request) 449# except: 450# ... 451# finally: 452# await i(DbRequest(self.env_id, self._journal_db_id, True).unlock_databases({self._journal_db_id})) 453 454# def _save_to_copy(self, __request__type__: int, *args, **kwargs) -> ServiceRequest: 455# request: DbRequest = super()._save_to_copy(__request__type__, *args, **kwargs) 456# self._transaction_history.append(request) 457# return request 458 459# def __enter__(self) -> 'TransactionContextManager': 460# self._transaction = self.env.begin(write=True) 461# self._transaction_context_manager = self._transaction.__enter__() 462# return self 463 464# def __exit__(self, exc_type, exc_val, exc_tb) -> None: 465# self._transaction_context_manager.__exit__(exc_type, exc_val, exc_tb) 466# self._transaction = None 467# self._transaction_context_manager = None 468 469# async def __aenter__(self) -> 'TransactionContextManager': 470# i: Interface = current_interface() 471# db_service: Db = i._loop.get_service_instance(Db) 472# if self.env_id not in db_service.db_environments: 473# raise UnknownEnvError(self.env_id) 474 475# env_info: EnvInfo = self.db_environments[self.env_id] 476# if self._journal_db_id not in env_info.databases: 477# await i(self.to_request().open_databases({self._journal_db_id})) 478 479# return self 480 481# async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: 482# if (exc_type is None) and (exc_val is None) and (exc_tb is None): 483# await self._commit() 484 485 486def check_request(method: Callable): 487 def wrapper(self: 'Db', request: DbRequest, *args, **kwargs) -> ServiceProcessingResponse: 488 if request.env_id not in self.db_environments: 489 return True, None, UnknownEnvError(request.env_id) 490 491 if request.db_id not in self.db_environments[request.env_id].databases: 492 return True, None, UnknownEnvDBError(request.env_id, request.db_id) 493 494 return method(self, request, *args, **kwargs) 495 496 original_func_sign: inspect.Signature = inspect.signature(method) 497 update_wrapper(wrapper, method) 498 wrapper.__signature__ = original_func_sign.replace(parameters=tuple(original_func_sign.parameters.values()), return_annotation=original_func_sign.return_annotation) 499 return wrapper 500 501 502class Db(Service, EntityStatsMixin): 503 def __init__(self, loop: CoroSchedulerType): 504 super(Db, self).__init__(loop) 505 self.default_env_name: str = '__default__.dbenv' 506 self.default_envs_dir: str = 'db_envs' 507 # self.drop_db_requests: Dict[CoroID, Tuple[Hashable, bool]] = dict() 508 # self.drop_db_requests: Dict[EnvId, Dict[CoroID, Tuple[DbId, bool]]] = dict() 509 self.drop_db_requests: Dict[EnvId, Dict[DbId, Tuple[bool, Set[CoroID]]]] = dict() 510 # self.read_queue: List[Tuple[CoroID, Tuple[RawKeyType, DbId]]] = list() 511 self.read_queue: Dict[EnvId, Dict[DbId, Dict[RawKeyType, Set[CoroID]]]] = dict() 512 self.massive_read_queue: Dict[EnvId, Dict[CoroID, Dict[DbId, List[RawKeyType]]]] = dict() 513 # self.massive_read_queue: List[Tuple[CoroID, Set[Tuple[KeyType, DbId]]]] = list() 514 self.put_queue: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]] = dict() 515 self.data_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]] = dict() 516 # self.data_cache: Dict[EnvId, Dict[Tuple[RawKeyType, DbId], RawValueType]] = dict() 517 self.max_data_cache_size: Union[None, int] = 10000 518 self.deletion_cache: Dict[EnvId, Dict[DbId, Set[RawKeyType]]] = dict() 519 self.kv_deletion_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[RawValueType]]]] = dict() 520 # self.kv_deletion_cache: Dict[Tuple[Hashable, Hashable], Any] = dict() 521 self.get_first_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict() 522 self.get_last_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict() 523 self.get_n_items_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, int, bool]]] = dict() 524 self.get_items_range_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, RawKeyType, int, bool]]] = dict() 525 # self.get_all_items_queue: List[Tuple[CoroID, DbId, EnvId]] = list() 526 self.get_all_items_queue: Dict[EnvId, List[Tuple[CoroID, DbId]]] = dict() 527 self.open_db_environment_requests: Dict[CoroID, Tuple[EnvId, Union[None, str], bool, Tuple, Dict]] = dict() 528 self.root_path_to_db_environments_rel: RelativePath = None 529 self.app_name_waiter: CoroWrapperBase = None 530 self.default_db_environment: lmdb.Environment = None 531 self.db_environments: Dict[EnvId, EnvInfo] = dict() 532 # self.databases: Dict[Hashable, Any] = dict() 533 # self.db_names: Dict[DbId, DbName] = dict() 534 self.async_loop = None 535 self.sync_time_interval = 1.0 536 self.last_sync_time = perf_counter() 537 self.envs_need_to_be_sync: Set[DbId] = set() 538 self.envs_in_sync: Set[DbId] = set() 539 self.force_sync: Set[EnvId] = set() 540 self.sync_an_each_write: bool = False 541 self.write_locked: Set[EnvId] = set() 542 self.writes_num: int = 0 543 self.reads_num: int = 0 544 self.deletes_num: int = 0 545 self.db_drops_num: int = 0 546 self.write_locked_coro_id: Set[CoroID] = set() 547 self.wake_up_handle_registered: bool = False 548 # self.serializer = best_serializer_for_standard_data(( 549 # DataFormats.binary, 550 # DataFormats.messagepack, 551 # Tags.can_use_bytes, 552 # Tags.decode_str_as_str, 553 # Tags.decode_list_as_list, 554 # Tags.decode_bytes_as_bytes, 555 # Tags.superficial, 556 # Tags.current_platform, 557 # Tags.multi_platform, 558 # ), 559 # TestDataType.small, 560 # 0.1) 561 self.serializer = Serializer(Serializers.msgspec_messagepack) 562 self._request_workers = { 563 0: self._on_set_root_path_to_db_environments, 564 1: self._on_open_databases, 565 2: self._on_drop_db, 566 3: self._on_sync, 567 4: self._on_get, 568 5: self._on_get_items, 569 6: self._on_get_all_items, 570 7: self._on_put, 571 8: self._on_put_items, 572 9: self._on_delete, 573 10: self._on_delete_kv, 574 11: self._on_delete_items, 575 12: self._on_delete_kv_items, 576 13: self._on_open_db_environment, 577 14: self._on_get_first, 578 15: self._on_get_last, 579 16: self._on_get_n_items, 580 17: self._on_get_items_range, 581 18: self._on_close_db_environment, 582 } 583 584 # TODO: sync with last implementation 585 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 586 return type(self).__name__, { 587 'db_env_ids': list(self.db_environments.keys()), 588 'writes num': self.writes_num, 589 'reads num': self.reads_num, 590 'deletes num': self.deletes_num, 591 'db drops num': self.db_drops_num, 592 } 593 594 def norm_key(self, key: InputKeyType) -> NormalizedKeyType: 595 return normalize_compound_key(key) 596 597 def raw_key(self, env_or_id: Union[lmdb.Environment, EnvId], key: InputKeyType) -> RawKeyType: 598 raw_key: bytes = self.serializer.dumps(normalize_compound_key(key)) 599 if isinstance(env_or_id, lmdb.Environment): 600 env = env_or_id 601 else: 602 env = self.db_environments[env_or_id].env 603 604 if len(raw_key) > env.max_key_size(): 605 raise RawKeyIsTooLargeError(f'Raw form ({raw_key=}) of the key ({key=}) is too large: {len(raw_key)} > {env.max_key_size()}') 606 607 def destroy(self): 608 # TODO: we need to use some loop destroy service in order to put our coro which will write all pending queues, 609 # sync envirounments and close them. Also we need to prevent new requests from being processed. 610 db_environments_values = self.db_environments.values() 611 self.db_environments = type(self.db_environments)() 612 self.default_db_environment = None 613 for env_info in db_environments_values: 614 env_info.close() 615 616 def single_task_registration_or_immediate_processing( 617 self, *args, **kwargs) -> ServiceProcessingResponse: 618 result = self.try_resolve_request(*args, **kwargs) 619 if result is None: 620 self._ensure_default_db_environment() 621 return True, None, None 622 else: 623 return result 624 625 def _ensure_default_db_environment(self) -> bool: 626 if self.default_db_environment is None: 627 if self.root_path_to_db_environments_rel is None: 628 if self.app_name_waiter is None: 629 async def coro(i: Interface, self: 'Db'): 630 app_name_for_fs = await i(InstanceRequest().wait('app_name_for_fs')) 631 app_data_dir_path_type: AppDirectoryType = await i(InstanceRequest().wait('app_data_dir_path_type')) 632 app_dir_path: AppDirPath = await i(InstanceRequest().wait(AppDirPath)) 633 app_data_dir_path: str = app_dir_path.cached(app_data_dir_path_type, app_name_for_fs) 634 self.root_path_to_db_environments_rel = RelativePath(RelativePath(app_data_dir_path)(self.default_envs_dir)) 635 self._init_default_db_env() 636 self.app_name_waiter = None 637 self.make_live() 638 639 self.app_name_waiter = put_root_from_other_service(self, coro, self) 640 # self.app_name_waiter.is_background_coro = True 641 642 self.make_dead() 643 return False 644 else: 645 self._init_default_db_env() 646 return True 647 else: 648 return True 649 650 def full_processing_iteration(self): 651 # TODO: combine all queues into one single transaction by env_id. Or at most two transactions: read and write. 652 # This will improve performance and will give ability to move transations to other threads or even processess if needed. 653 654 # TODO: since DB can not handle big number of transactions per secons, it is better to combine all requests and process 655 # them at most as once a milisecond (it is frequently engough). 656 657 if not self._ensure_default_db_environment(): 658 return 659 660 if self.force_sync: 661 self.envs_need_to_be_sync |= self.force_sync 662 self.force_sync = set() 663 664 put_queue_buff = self.put_queue 665 self.put_queue = type(put_queue_buff)() 666 667 data_cache_buff = self.data_cache # will be cleared at the end of the iteration if necessary 668 669 read_queue_buff = self.read_queue 670 self.read_queue = type(read_queue_buff)() 671 672 massive_read_queue_buff = self.massive_read_queue 673 self.massive_read_queue = type(massive_read_queue_buff)() 674 675 deletion_cache_buff = self.deletion_cache 676 self.deletion_cache = type(deletion_cache_buff)() 677 678 kv_deletion_cache_buff = self.kv_deletion_cache 679 self.kv_deletion_cache = type(kv_deletion_cache_buff)() 680 681 get_all_items_queue_buff = self.get_all_items_queue 682 self.get_all_items_queue = type(get_all_items_queue_buff)() 683 684 get_first_queue_buff = self.get_first_queue 685 self.get_first_queue = type(get_first_queue_buff)() 686 687 get_last_queue_buff = self.get_last_queue 688 self.get_last_queue = type(get_last_queue_buff)() 689 690 get_n_items_queue_buff = self.get_n_items_queue 691 self.get_n_items_queue = type(get_n_items_queue_buff)() 692 693 get_items_range_queue_buff = self.get_items_range_queue 694 self.get_items_range_queue = type(get_items_range_queue_buff)() 695 696 open_db_environment_requests_buff: Dict[CoroID, Tuple[EnvId, Union[None, str], bool, Tuple, Dict]] = self.open_db_environment_requests 697 self.open_db_environment_requests = type(open_db_environment_requests_buff)() 698 699 # open_db_environment 700 for coro_id, request_info in open_db_environment_requests_buff.items(): 701 env_id, env_path, can_be_written_externally, args, kwargs = request_info 702 if env_id in self.db_environments: 703 self.register_response(coro_id, self.db_environments[env_id]) 704 else: 705 exception = None 706 try: 707 result = self._init_db_env(env_id, env_path, can_be_written_externally, *args, **kwargs) 708 except: 709 exception = get_exception() 710 711 self.register_response(coro_id, result, exception) 712 713 # put 714 def put_handler(env_info: EnvInfo, put_info: Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]): 715 try: 716 with env_info.env.begin(write=True) as txn: 717 for db_id, db_put_info in put_info.items(): 718 if db_id in env_info.databases: 719 for raw_key, values in db_put_info.items(): 720 for value in values: 721 if isinstance(value, RawValueInfo): 722 value, dupdata, overwrite, append = value 723 else: 724 dupdata, overwrite, append = True, True, False 725 726 txn.put(raw_key, value, db=env_info.databases[db_id], dupdata=dupdata, overwrite=overwrite, append=append) 727 728 self.writes_num += len(db_put_info) 729 except lmdb.MapFullError: 730 raise DBError.from_exception(db_id) 731 732 for env_id, put_info in put_queue_buff.items(): 733 if env_id in self.db_environments: 734 self.envs_need_to_be_sync.add(env_id) 735 lmdb_reapplier(self.db_environments[env_id], put_handler, put_info) 736 737 # TODO: implement replace* methods processing 738 739 # delete 740 for env_id, deletion_cache_buff_db_info in deletion_cache_buff.items(): 741 if env_id in self.db_environments: 742 env_info = self.db_environments[env_id] 743 self.envs_need_to_be_sync.add(env_id) 744 with env_info.env.begin(write=True) as txn: 745 for db_id, del_keys in deletion_cache_buff_db_info.items(): 746 if db_id in env_info.databases: 747 for del_raw_key in del_keys: 748 txn.delete(del_raw_key, db=env_info.databases[db_id]) 749 self.deletes_num += 1 750 try: 751 data_cache_buff[env_id][db_id].pop(del_raw_key, None) 752 except KeyError: 753 pass 754 755 # delete_kv 756 for env_id, kv_deletion_cache_buff_db_info in kv_deletion_cache_buff.items(): 757 if env_id in self.db_environments: 758 env_info = self.db_environments[env_id] 759 self.envs_need_to_be_sync.add(env_id) 760 with env_info.env.begin(write=True) as txn: 761 for db_id, del_keys in kv_deletion_cache_buff_db_info.items(): 762 if db_id in env_info.databases: 763 for del_raw_key, del_raw_values in del_keys.items(): 764 for del_raw_value in del_raw_values: 765 txn.delete(del_raw_key, del_raw_value, db=env_info.databases[db_id]) 766 self.deletes_num += 1 767 try: 768 raw_values: List[Union[RawValueType, RawValueInfo]] = data_cache_buff[env_id][db_id][del_raw_key] 769 new_raw_values: List[Union[RawValueType, RawValueInfo]] = list() 770 for raw_value in raw_values: 771 raw_value_original = raw_value 772 if isinstance(raw_value, RawValueInfo): 773 raw_value, _, _, _ = raw_value 774 775 if raw_value != del_raw_value: 776 new_raw_values.append(raw_value_original) 777 778 if new_raw_values: 779 data_cache_buff[env_id][db_id][del_raw_key] = new_raw_values 780 else: 781 data_cache_buff[env_id][db_id].pop(del_raw_key, None) 782 except KeyError: 783 pass 784 785 # drop 786 drop_db_requests_buff = self.drop_db_requests 787 self.drop_db_requests = type(drop_db_requests_buff)() 788 789 def drop_handler(env_info: EnvInfo, drop_info: Dict[DbId, Tuple[bool, Set[CoroID]]]): 790 for db_id, db_drop_info in drop_info.items(): 791 delete_db, coro_id = db_drop_info 792 if db_id in env_info.databases: 793 try: 794 with env_info.env.begin(write=True) as txn: 795 txn.drop(db=env_info.databases[db_id], delete=delete_db) 796 if delete_db: 797 del env_info.databases[db_id] 798 del env_info.db_names[db_id] 799 800 self.db_drops_num += 1 801 except lmdb.MapFullError: 802 raise DBError.from_exception(db_id) 803 804 self.register_response(coro_id, None, UnknownEnvError(env_id)) 805 else: 806 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 807 808 for env_id, drop_info in drop_db_requests_buff.items(): 809 if env_id in self.db_environments: 810 self.envs_need_to_be_sync.add(env_id) 811 lmdb_reapplier(self.db_environments[env_id], drop_handler, drop_info) 812 else: 813 for db_id, db_drop_info in drop_info.items(): 814 delete_db, coro_id = db_drop_info 815 self.register_response(coro_id, None, UnknownEnvError(env_id)) 816 817 # get 818 def get_item(txn, key_info: Tuple[RawKeyType, DbId, EnvId], data_cache_buff: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]]) -> Tuple[ValueType, Optional[Exception]]: 819 key, db_id, env_id = key_info 820 need_to_get_from_db = True 821 try: 822 values = data_cache_buff[env_id][db_id][key] 823 if values: 824 value = values[0] 825 if isinstance(value, RawValueInfo): 826 value, _, _, _ = value 827 828 need_to_get_from_db = False 829 except KeyError: 830 pass 831 832 if need_to_get_from_db: 833 value = txn.get(key, db=self.db_environments[env_id].databases[db_id]) 834 self.reads_num += 1 835 836 exception = None 837 try: 838 if value is None: 839 exception = DbKeyError(key_info) 840 else: 841 value = self.serializer.loads(value) 842 except: 843 exception = get_exception() 844 845 return value, exception 846 847 # _on_get 848 for env_id, read_queue_buff_db_info in read_queue_buff.items(): 849 if env_id in self.db_environments: 850 env_info = self.db_environments[env_id] 851 for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items(): 852 if db_id in env_info.databases: 853 with env_info.env.begin() as txn: 854 for raw_key, coro_ids in read_queue_buff_db_key_info.items(): 855 value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff) 856 for coro_id in coro_ids: 857 self.register_response(coro_id, value, exception) 858 else: 859 for raw_key, coro_ids in read_queue_buff_db_key_info.items(): 860 for coro_id in coro_ids: 861 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 862 else: 863 for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items(): 864 for raw_key, coro_ids in read_queue_buff_db_key_info.items(): 865 for coro_id in coro_ids: 866 self.register_response(coro_id, None, UnknownEnvError(env_id)) 867 868 # _on_get_items 869 results: Dict[CoroID, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]]] = dict() 870 for env_id, massive_read_queue_buff_coro_info in massive_read_queue_buff.items(): 871 if env_id in self.db_environments: 872 env_info = self.db_environments[env_id] 873 for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items(): 874 if coro_id not in results: 875 results[coro_id] = dict() 876 877 coro_results = results[coro_id] 878 for db_id, raw_keys in read_queue_buff_db_info.items(): 879 if db_id not in coro_results: 880 coro_results[db_id] = OrderedDict() 881 882 coro_db_results = coro_results[db_id] 883 if db_id in env_info.databases: 884 with env_info.env.begin() as txn: 885 for raw_key in raw_keys: 886 value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff) 887 coro_db_results[normalize_compound_key(raw_key)] = (value, exception) 888 else: 889 for coro_id in coro_ids: 890 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 891 else: 892 for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items(): 893 for db_id, raw_keys in read_queue_buff_db_info.items(): 894 for coro_id in coro_ids: 895 self.register_response(coro_id, None, UnknownEnvError(env_id)) 896 897 for coro_id, coro_results in results.items(): 898 if coro_results: 899 db_id = tuple(coro_results.keys())[0] 900 self.register_response(coro_id, coro_results[db_id], None) 901 else: 902 self.register_response(coro_id, OrderedDict(), None) 903 904 # get all items 905 for env_id, requests_info in get_all_items_queue_buff.items(): 906 for request_info in requests_info: 907 coro_id, db_id = request_info 908 if env_id in self.db_environments: 909 env_info = self.db_environments[env_id] 910 env = env_info.env 911 if db_id in env_info.databases: 912 db = env_info.databases[db_id] 913 with env.begin(db=db) as txn: 914 result = list() 915 exception = None 916 try: 917 result = [(normalize_compound_key(self.serializer.loads(k)), self.serializer.loads(v)) for k, v in txn.cursor() if not k.startswith(EnvInfo.db_name_prefix_bytes)] 918 self.reads_num += len(result) 919 except: 920 exception = get_exception() 921 922 self.register_response(coro_id, result, exception) 923 else: 924 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 925 else: 926 self.register_response(coro_id, None, UnknownEnvError(env_id)) 927 928 # get_first 929 for env_id, get_first_queue_buff_db_info in get_first_queue_buff.items(): 930 if env_id in self.db_environments: 931 env_info = self.db_environments[env_id] 932 for db_id, coro_ids in get_first_queue_buff_db_info.items(): 933 if db_id in env_info.databases: 934 db = env_info.databases[db_id] 935 with env_info.env.begin(db=db) as txn: 936 result = None 937 exception = None 938 try: 939 cursor: lmdb.Cursor = txn.cursor() 940 if cursor.first(): 941 key = cursor.key() 942 key_found: bool = True 943 while key.startswith(EnvInfo.db_name_prefix_bytes): 944 key_found = cursor.next_nodup() 945 if not key_found: 946 break 947 948 key = cursor.key() 949 950 if key_found: 951 value = cursor.value() 952 result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value())) 953 self.reads_num += 1 954 else: 955 exception = KeyError() 956 else: 957 exception = KeyError() 958 except: 959 exception = get_exception() 960 961 for coro_id in coro_ids: 962 self.register_response(coro_id, result, exception) 963 else: 964 for coro_id in coro_ids: 965 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 966 else: 967 for db_id, coro_ids in get_first_queue_buff_db_info.items(): 968 for coro_id in coro_ids: 969 self.register_response(coro_id, None, UnknownEnvError(env_id)) 970 971 # get_last 972 for env_id, get_last_queue_buff_db_info in get_last_queue_buff.items(): 973 if env_id in self.db_environments: 974 env_info = self.db_environments[env_id] 975 for db_id, coro_ids in get_last_queue_buff_db_info.items(): 976 if db_id in env_info.databases: 977 db = env_info.databases[db_id] 978 with env_info.env.begin(db=db) as txn: 979 result = None 980 exception = None 981 try: 982 cursor: lmdb.Cursor = txn.cursor() 983 if cursor.last(): 984 key: bytes = cursor.key() 985 key_found: bool = True 986 key = cursor.key() 987 while key.startswith(EnvInfo.db_name_prefix_bytes): 988 key_found = cursor.prev_nodup() 989 if not key_found: 990 break 991 992 key = cursor.key() 993 994 if key_found: 995 value: bytes = cursor.value() 996 result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value())) 997 self.reads_num += 1 998 else: 999 exception = KeyError() 1000 else: 1001 exception = KeyError() 1002 except: 1003 exception = get_exception() 1004 1005 for coro_id in coro_ids: 1006 self.register_response(coro_id, result, exception) 1007 else: 1008 for coro_id in coro_ids: 1009 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 1010 else: 1011 for db_id, coro_ids in get_last_queue_buff_db_info.items(): 1012 for coro_id in coro_ids: 1013 self.register_response(coro_id, None, UnknownEnvError(env_id)) 1014 1015 # get_n_items 1016 for env_id, get_n_items_queue_buff_coro_info in get_n_items_queue_buff.items(): 1017 if env_id in self.db_environments: 1018 env_info = self.db_environments[env_id] 1019 for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items(): 1020 coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list() 1021 db_id, first_desired_raw_key, num_items, reverse = read_queue_buff_db_info 1022 if db_id in env_info.databases: 1023 db = env_info.databases[db_id] 1024 with env_info.env.begin(db=db) as txn: 1025 exception = None 1026 try: 1027 cursor: lmdb.Cursor = txn.cursor() 1028 if cursor.set_range(first_desired_raw_key): 1029 if reverse: 1030 cursor_iterator = cursor.iterprev(keys=True, values=True) 1031 else: 1032 cursor_iterator = cursor.iternext(keys=True, values=True) 1033 1034 for raw_key, raw_value in cursor_iterator: 1035 if raw_key.startswith(EnvInfo.db_name_prefix_bytes): 1036 continue 1037 1038 if (num_items is not None) and (num_items <= 0): 1039 break 1040 1041 coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value))) 1042 self.reads_num += 1 1043 if num_items is not None: 1044 num_items -= 1 1045 else: 1046 exception = KeyError() 1047 except: 1048 exception = get_exception() 1049 1050 self.register_response(coro_id, coro_results, exception) 1051 else: 1052 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 1053 else: 1054 for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items(): 1055 for coro_id in coro_ids: 1056 self.register_response(coro_id, None, UnknownEnvError(env_id)) 1057 1058 # get_items_range 1059 for env_id, get_items_range_queue_buff_coro_info in get_items_range_queue_buff.items(): 1060 if env_id in self.db_environments: 1061 env_info = self.db_environments[env_id] 1062 for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items(): 1063 coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list() 1064 db_id, first_desired_raw_key, last_desired_raw_key, num_items, reverse = read_queue_buff_db_info 1065 if db_id in env_info.databases: 1066 db = env_info.databases[db_id] 1067 with env_info.env.begin(db=db) as txn: 1068 exception = None 1069 try: 1070 cursor: lmdb.Cursor = txn.cursor() 1071 if cursor.set_range(first_desired_raw_key): 1072 if reverse: 1073 cursor_iterator = cursor.iterprev(keys=True, values=True) 1074 else: 1075 cursor_iterator = cursor.iternext(keys=True, values=True) 1076 1077 for raw_key, raw_value in cursor_iterator: 1078 if raw_key.startswith(EnvInfo.db_name_prefix_bytes): 1079 continue 1080 1081 if reverse: 1082 if raw_key < last_desired_raw_key: 1083 break 1084 else: 1085 if raw_key > last_desired_raw_key: 1086 break 1087 1088 if (num_items is not None) and (num_items <= 0): 1089 break 1090 1091 coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value))) 1092 self.reads_num += 1 1093 if num_items is not None: 1094 num_items -= 1 1095 else: 1096 exception = KeyError() 1097 except: 1098 exception = get_exception() 1099 1100 self.register_response(coro_id, coro_results, exception) 1101 else: 1102 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 1103 else: 1104 for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items(): 1105 for coro_id in coro_ids: 1106 self.register_response(coro_id, None, UnknownEnvError(env_id)) 1107 1108 need_to_sync = self.sync_an_each_write 1109 1110 # periodic sync 1111 if (perf_counter() - self.last_sync_time) >= self.sync_time_interval: 1112 for env_id, env_info in self.db_environments.items(): 1113 self.envs_need_to_be_sync.add(env_id) 1114 need_to_sync = True 1115 1116 # sync 1117 if need_to_sync and self.envs_need_to_be_sync: 1118 envs_need_to_be_sync_bak = self.envs_need_to_be_sync - self.envs_in_sync 1119 self.envs_need_to_be_sync = set(self.envs_in_sync) 1120 for env_id in envs_need_to_be_sync_bak: 1121 self.sync_in_thread_pool(env_id) 1122 1123 self.last_sync_time = perf_counter() 1124 1125 # invalidate data_cache for envs that can be written externally 1126 for env_id, env_info in self.db_environments.items(): 1127 if env_info.init_info.can_be_written_externally: 1128 self.data_cache.pop(env_id, None) 1129 1130 # clear too big caches 1131 for env_id, env_info in self.db_environments.items(): 1132 if env_id in self.data_cache: 1133 if len(self.data_cache[env_id]) > self.max_data_cache_size: 1134 del self.data_cache[env_id] 1135 1136 self.make_dead() 1137 1138 def in_work(self) -> bool: 1139 result: bool = bool(self.default_db_environment is None) \ 1140 or bool(self.open_db_environment_requests) \ 1141 or bool(self.get_first_queue) \ 1142 or bool(self.get_last_queue) \ 1143 or bool(self.get_n_items_queue) \ 1144 or bool(self.get_items_range_queue) \ 1145 or bool(self.read_queue) \ 1146 or bool(self.massive_read_queue) \ 1147 or bool(self.get_all_items_queue)\ 1148 or bool(self.force_sync) \ 1149 or bool(self.deletion_cache) \ 1150 or bool(self.kv_deletion_cache) \ 1151 or bool(self.drop_db_requests) \ 1152 or bool(self.put_queue) \ 1153 or bool(self.envs_need_to_be_sync) \ 1154 or ((perf_counter() - self.last_sync_time) >= self.sync_time_interval) 1155 return self.thrifty_in_work(result) 1156 1157 def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]: 1158 time_since_last_sync_time: float = perf_counter() - self.last_sync_time 1159 if self.sync_time_interval > time_since_last_sync_time: 1160 return True, self.sync_time_interval - time_since_last_sync_time 1161 else: 1162 return True, 0 1163 1164 def _init_db_env(self, env_id: EnvId, env_path: Union[None, str], can_be_written_externally: bool, *args, **kwargs) -> EnvInfo: 1165 if env_id in self.db_environments: 1166 return self.db_environments[env_id] 1167 1168 if env_path is None: 1169 if env_id is None: 1170 env_path = self.root_path_to_db_environments_rel(self.default_env_name) 1171 else: 1172 env_path = self.root_path_to_db_environments_rel(to_identifier(f'{env_id}')) 1173 else: 1174 if os.path.isabs(env_path): 1175 env_path = os.path.normpath(env_path) 1176 else: 1177 env_path = self.root_path_to_db_environments_rel(env_path) 1178 1179 env_init_info: EnvInitInfo = EnvInitInfo( 1180 env_id, 1181 env_path, 1182 can_be_written_externally, 1183 *args, **kwargs 1184 ) 1185 env_info: EnvInfo = EnvInfo(env_init_info) 1186 self.db_environments[env_info.env_id] = env_info 1187 env_info.env.sync(True) 1188 return env_info 1189 1190 def _init_default_db_env(self): 1191 args, kwargs = default_env_path_and_params() 1192 env_info: EnvInfo = self._init_db_env(None, *args, **kwargs) 1193 self.default_db_environment = env_info 1194 1195 def _on_open_db_environment(self, request: DbRequest, env_id: EnvId, env_path: Union[None, str], can_be_written_externally: bool, *args, **kwargs) -> Tuple[bool, EnvInfo, Exception]: 1196 self.open_db_environment_requests[self.current_caller_coro_info.coro_id] = (env_id, env_path, can_be_written_externally, args, kwargs) 1197 self.make_live() 1198 return False, None, None 1199 1200 @check_request 1201 def _on_close_db_environment(self, request: DbRequest): 1202 env_id: EnvId = request.env_id 1203 1204 if env_id is None: 1205 return True, False, DefaultDbEnvironmentCanNotBeClosedManualyError() 1206 1207 if env_id in self.db_environments: 1208 self.db_environments[env_id].close() 1209 del self.db_environments[env_id] 1210 return True, True, None 1211 else: 1212 return True, False, UnknownEnvError(env_id) 1213 1214 def _on_set_root_path_to_db_environments(self, request: DbRequest, root_path_to_db_environments: str) -> ServiceProcessingResponse: 1215 self.root_path_to_db_environments_rel = RelativePath(root_path_to_db_environments) 1216 return True, True, None 1217 1218 @check_request 1219 def _on_sync(self, request: DbRequest) -> ServiceProcessingResponse: 1220 if self.put_queue: 1221 self.force_sync.add(request.env_id) 1222 self.make_live() 1223 else: 1224 # self.default_db_environment.sync(True) 1225 self.sync_in_thread_pool(request.env_id) 1226 1227 return True, None, None 1228 1229 @check_request 1230 def _on_get(self, request: DbRequest, key: KeyType) -> ServiceProcessingResponse: 1231 coro_id = self.current_caller_coro_info.coro_id 1232 key = self.serializer.dumps(normalize_compound_key(key)) 1233 db_id = request.db_id 1234 env_id = request.env_id 1235 1236 if env_id in self.data_cache: 1237 env_cache = self.data_cache[env_id] 1238 if db_id in env_cache: 1239 db_cache = env_cache[db_id] 1240 if key in db_cache: 1241 values = db_cache[key] 1242 if values: 1243 value = values[0] 1244 if isinstance(value, RawValueInfo): 1245 value, _, _, _ = value 1246 1247 value = self.serializer.loads(value) 1248 return True, value, None 1249 1250 if env_id not in self.read_queue: 1251 self.read_queue[env_id] = dict() 1252 1253 read_queue_env = self.read_queue[env_id] 1254 if db_id not in read_queue_env: 1255 read_queue_env[db_id] = dict() 1256 1257 read_queue_env_db = read_queue_env[db_id] 1258 if key not in read_queue_env_db: 1259 read_queue_env_db[key] = set() 1260 1261 read_queue_env_db_key = read_queue_env_db[key] 1262 read_queue_env_db_key.add(coro_id) 1263 self.make_live() 1264 return False, None, None 1265 1266 @check_request 1267 def _on_get_first(self, request: DbRequest) -> ServiceProcessingResponse: 1268 coro_id = self.current_caller_coro_info.coro_id 1269 db_id = request.db_id 1270 env_id = request.env_id 1271 1272 if env_id not in self.get_first_queue: 1273 self.get_first_queue[env_id] = dict() 1274 1275 get_first_queue_env = self.get_first_queue[env_id] 1276 if db_id not in get_first_queue_env: 1277 get_first_queue_env[db_id] = set() 1278 1279 get_first_queue_env_db = get_first_queue_env[db_id] 1280 get_first_queue_env_db.add(coro_id) 1281 1282 self.make_live() 1283 return False, None, None 1284 1285 @check_request 1286 def _on_get_last(self, request: DbRequest) -> ServiceProcessingResponse: 1287 coro_id = self.current_caller_coro_info.coro_id 1288 db_id = request.db_id 1289 env_id = request.env_id 1290 1291 if env_id not in self.get_last_queue: 1292 self.get_last_queue[env_id] = dict() 1293 1294 get_last_queue_env = self.get_last_queue[env_id] 1295 if db_id not in get_last_queue_env: 1296 get_last_queue_env[db_id] = set() 1297 1298 get_last_queue_env_db = get_last_queue_env[db_id] 1299 get_last_queue_env_db.add(coro_id) 1300 1301 self.make_live() 1302 return False, None, None 1303 1304 @check_request 1305 def _on_get_n_items(self, request: DbRequest, desired_key: InputKeyType, num: Optional[int] = None, reverse: bool = False) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]: 1306 coro_id = self.current_caller_coro_info.coro_id 1307 desired_key = self.serializer.dumps(normalize_compound_key(desired_key)) 1308 db_id = request.db_id 1309 env_id = request.env_id 1310 1311 if env_id not in self.get_n_items_queue: 1312 self.get_n_items_queue[env_id] = dict() 1313 1314 get_n_items_queue_env = self.get_n_items_queue[env_id] 1315 get_n_items_queue_env[coro_id] = (db_id, desired_key, num, reverse) 1316 self.make_live() 1317 return False, None, None 1318 1319 @check_request 1320 def _on_get_items_range(self, request: DbRequest, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None, reverse: bool = False) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]: 1321 coro_id = self.current_caller_coro_info.coro_id 1322 first_desired_key = self.serializer.dumps(normalize_compound_key(first_desired_key)) 1323 last_desired_key = self.serializer.dumps(normalize_compound_key(last_desired_key)) 1324 db_id = request.db_id 1325 env_id = request.env_id 1326 1327 if env_id not in self.get_items_range_queue: 1328 self.get_items_range_queue[env_id] = dict() 1329 1330 get_items_range_queue_env = self.get_items_range_queue[env_id] 1331 get_items_range_queue_env[coro_id] = (db_id, first_desired_key, last_desired_key, num, reverse) 1332 self.make_live() 1333 return False, None, None 1334 1335 @check_request 1336 def _on_get_items(self, request: DbRequest, db_keys: Sequence[InputKeyType]) -> Tuple[bool, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]], Optional[Exception]]: 1337 coro_id = self.current_caller_coro_info.coro_id 1338 db_id = request.db_id 1339 env_id = request.env_id 1340 1341 if env_id not in self.massive_read_queue: 1342 self.massive_read_queue[env_id] = dict() 1343 1344 massive_read_queue_env = self.massive_read_queue[env_id] 1345 if coro_id not in massive_read_queue_env: 1346 massive_read_queue_env[coro_id] = dict() 1347 1348 massive_read_queue_env_coro = massive_read_queue_env[coro_id] 1349 if db_id not in massive_read_queue_env_coro: 1350 massive_read_queue_env_coro[db_id] = list() 1351 1352 massive_read_queue_env_coro_db = massive_read_queue_env_coro[db_id] 1353 for key in db_keys: 1354 massive_read_queue_env_coro_db.append(self.serializer.dumps(normalize_compound_key(key))) 1355 1356 self.make_live() 1357 return False, None, None 1358 1359 @check_request 1360 def _on_get_all_items(self, request: DbRequest) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]: 1361 env_id = request.env_id 1362 db_id = request.db_id 1363 coro_id = self.current_caller_coro_info.coro_id 1364 if env_id not in self.get_all_items_queue: 1365 self.get_all_items_queue[env_id] = list() 1366 1367 self.get_all_items_queue[env_id].append((coro_id, db_id)) 1368 self.make_live() 1369 return False, None, None 1370 1371 @check_request 1372 def _on_put(self, request: DbRequest, key: KeyType, value: Union[ValueType, ValueInfo]) -> Tuple[bool, RawValueType, Optional[Exception]]: 1373 key = self.serializer.dumps(normalize_compound_key(key)) 1374 db_id = request.db_id 1375 env_id = request.env_id 1376 1377 exception = None 1378 result = None 1379 try: 1380 # self.put_queue 1381 if env_id not in self.put_queue: 1382 self.put_queue[env_id] = dict() 1383 1384 env_put_queue = self.put_queue[env_id] 1385 if db_id not in env_put_queue: 1386 env_put_queue[db_id] = dict() 1387 1388 env_put_queue_db = env_put_queue[db_id] 1389 if key not in env_put_queue_db: 1390 env_put_queue_db[key] = list() 1391 1392 # self.data_cache 1393 if env_id not in self.data_cache: 1394 self.data_cache[env_id] = dict() 1395 1396 env_data_cache = self.data_cache[env_id] 1397 if db_id not in env_data_cache: 1398 env_data_cache[db_id] = dict() 1399 1400 env_data_cache_db = env_data_cache[db_id] 1401 if key not in env_data_cache_db: 1402 env_data_cache_db[key] = list() 1403 1404 # both 1405 if isinstance(value, ValueInfo): 1406 result = RawValueInfo(self.serializer.dumps(value.value), value.dupdata, value.overwrite, value.append) 1407 if env_put_queue_db[key] and not value.dupdata: 1408 env_data_cache_db[key][0] = env_put_queue_db[key][0] = result 1409 else: 1410 env_put_queue_db[key].append(result) 1411 env_data_cache_db[key].append(result) 1412 else: 1413 result = self.serializer.dumps(value) 1414 if env_put_queue_db[key]: 1415 env_data_cache_db[key][0] = env_put_queue_db[key][0] = result 1416 else: 1417 env_put_queue_db[key].append(result) 1418 env_data_cache_db[key].append(result) 1419 except: 1420 exception = get_exception() 1421 1422 self.make_live() 1423 return True, result, exception 1424 1425 @check_request 1426 def _on_put_items(self, request: DbRequest, db_items: Dict[InputKeyType, Union[ValueType, ValueInfo]]) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]], Optional[Exception]]: 1427 result_items: Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]] = dict() 1428 db_id = request.db_id 1429 env_id = request.env_id 1430 1431 # self.put_queue 1432 if env_id not in self.put_queue: 1433 self.put_queue[env_id] = dict() 1434 1435 env_put_queue = self.put_queue[env_id] 1436 if db_id not in result_items: 1437 result_items[db_id] = dict() 1438 1439 result_db_items = result_items[db_id] 1440 1441 if db_id not in env_put_queue: 1442 env_put_queue[db_id] = dict() 1443 1444 env_put_queue_db = env_put_queue[db_id] 1445 1446 # self.data_cache 1447 if env_id not in self.data_cache: 1448 self.data_cache[env_id] = dict() 1449 1450 env_data_cache = self.data_cache[env_id] 1451 if db_id not in result_items: 1452 result_items[db_id] = dict() 1453 1454 result_db_items = result_items[db_id] 1455 1456 if db_id not in env_data_cache: 1457 env_data_cache[db_id] = dict() 1458 1459 env_data_cache_db = env_data_cache[db_id] 1460 1461 # both 1462 for key, value in db_items.items(): 1463 key = self.serializer.dumps(normalize_compound_key(key)) 1464 1465 exception = None 1466 result = None 1467 try: 1468 if key not in env_put_queue_db: 1469 env_put_queue_db[key] = list() 1470 1471 if key not in env_data_cache_db: 1472 env_data_cache_db[key] = list() 1473 1474 if isinstance(value, ValueInfo): 1475 result = env_data_cache_db[key][0] = env_put_queue_db[key][0] = RawValueInfo(self.serializer.dumps(value.value), value.dupdata, value.overwrite, value.append) 1476 if env_data_cache_db[key] and not value.dupdata: 1477 env_data_cache_db[key][0] = env_put_queue_db[key][0] = result 1478 else: 1479 env_put_queue_db[key].append(result) 1480 env_data_cache_db[key].append(result) 1481 else: 1482 result = self.serializer.dumps(value) 1483 if env_data_cache_db[key]: 1484 env_data_cache_db[key][0] = env_put_queue_db[key][0] = result 1485 else: 1486 env_put_queue_db[key].append(result) 1487 env_data_cache_db[key].append(result) 1488 except: 1489 exception = get_exception() 1490 1491 result_db_items[key] = (result, exception) 1492 1493 self.make_live() 1494 return True, result_items, None 1495 1496 @check_request 1497 def _on_delete(self, request: DbRequest, key: KeyType) -> Tuple[bool, None, Optional[Exception]]: 1498 key = self.serializer.dumps(normalize_compound_key(key)) 1499 if key.startswith(EnvInfo.db_name_prefix_bytes): 1500 return True, None, KeyError(f'Can not delete special key (db info key): {key}') 1501 1502 db_id = request.db_id 1503 env_id = request.env_id 1504 1505 exception = None 1506 result = None 1507 try: 1508 if env_id not in self.deletion_cache: 1509 self.deletion_cache[env_id] = dict() 1510 1511 env_deletion_cache = self.deletion_cache[env_id] 1512 if db_id not in env_deletion_cache: 1513 env_deletion_cache[db_id] = set() 1514 1515 env_deletion_cache_db = env_deletion_cache[db_id] 1516 env_deletion_cache_db.add(key) 1517 except: 1518 exception = get_exception() 1519 1520 self.make_live() 1521 return True, result, exception 1522 1523 @check_request 1524 def _on_delete_kv(self, request: DbRequest, key: InputKeyType, value: ValueType) -> Tuple[bool, RawValueType, Optional[Exception]]: 1525 raw_key: bytes = self.serializer.dumps(normalize_compound_key(key)) 1526 if raw_key.startswith(EnvInfo.db_name_prefix_bytes): 1527 return True, None, KeyError(f'Can not delete special raw_key (db info raw_key): {raw_key}') 1528 1529 db_id = request.db_id 1530 env_id = request.env_id 1531 1532 exception = None 1533 raw_value = None 1534 try: 1535 if env_id not in self.kv_deletion_cache: 1536 self.kv_deletion_cache[env_id] = dict() 1537 1538 env_kv_deletion_cache = self.kv_deletion_cache[env_id] 1539 if db_id not in env_kv_deletion_cache: 1540 env_kv_deletion_cache[db_id] = dict() 1541 1542 env_kv_deletion_cache_db = env_kv_deletion_cache[db_id] 1543 if raw_key not in env_kv_deletion_cache_db: 1544 env_kv_deletion_cache_db[raw_key] = list() 1545 1546 raw_value = self.serializer.dumps(value) 1547 env_kv_deletion_cache_db[raw_key].append(raw_value) 1548 except: 1549 exception = get_exception() 1550 1551 self.make_live() 1552 return True, (raw_key, raw_value), exception 1553 1554 @check_request 1555 def _on_delete_items(self, request: DbRequest, db_items: Set[InputKeyType]) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Optional[Exception]]], Optional[Exception]]: 1556 result_items: Dict[InputKeyType, Tuple[RawKeyType, Optional[Exception]]] = dict() 1557 db_id = request.db_id 1558 env_id = request.env_id 1559 1560 if env_id not in self.deletion_cache: 1561 self.deletion_cache[env_id] = dict() 1562 1563 env_deletion_cache = self.deletion_cache[env_id] 1564 if db_id not in env_deletion_cache: 1565 env_deletion_cache[db_id] = set() 1566 1567 env_deletion_cache_db = env_deletion_cache[db_id] 1568 for key in db_items: 1569 exception = None 1570 try: 1571 raw_key: bytes = self.serializer.dumps(normalize_compound_key(key)) 1572 if raw_key.startswith(EnvInfo.db_name_prefix_bytes): 1573 exception = KeyError(f'Can not delete special key (db info key): {raw_key}') 1574 else: 1575 env_deletion_cache_db.add(raw_key) 1576 except: 1577 exception = get_exception() 1578 1579 result_items[key] = (raw_key, exception) 1580 1581 self.make_live() 1582 return True, result_items, None 1583 1584 @check_request 1585 def _on_delete_kv_items(self, request: DbRequest, db_items: Dict[InputKeyType, Tuple[ValueType]]) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]], Optional[Exception]]: 1586 result_items: Dict[InputKeyType, Tuple[RawKeyType, RawValueType, Optional[Exception]]] = dict() 1587 db_id = request.db_id 1588 env_id = request.env_id 1589 1590 if env_id not in self.kv_deletion_cache: 1591 self.kv_deletion_cache[env_id] = dict() 1592 1593 env_kv_deletion_cache = self.kv_deletion_cache[env_id] 1594 if db_id not in env_kv_deletion_cache: 1595 env_kv_deletion_cache[db_id] = dict() 1596 1597 env_kv_deletion_cache_db = env_kv_deletion_cache[db_id] 1598 for key, value in db_items.items(): 1599 exception = None 1600 raw_key = None 1601 raw_value = None 1602 try: 1603 raw_key: bytes = self.serializer.dumps(normalize_compound_key(key)) 1604 if raw_key.startswith(EnvInfo.db_name_prefix_bytes): 1605 exception = KeyError(f'Can not delete special key (db info key): {raw_key}') 1606 else: 1607 if raw_key not in env_kv_deletion_cache_db: 1608 env_kv_deletion_cache_db[raw_key] = list() 1609 1610 raw_value = self.serializer.dumps(value) 1611 env_kv_deletion_cache_db[raw_key].append(self.serializer.dumps(value)) 1612 except: 1613 exception = get_exception() 1614 1615 result_items[key] = (raw_key, raw_value, exception) 1616 1617 self.make_live() 1618 return True, result_items, None 1619 1620 def _on_open_databases(self, request: DbRequest, db_ids: Set[DbId], *args, **kwargs) -> ServiceProcessingResponse: 1621 try: 1622 env_info: EnvInfo = self.db_environments[request.env_id] 1623 except KeyError: 1624 exception = UnknownEnvError(request.env_id) 1625 return True, None, exception 1626 1627 for db_id in db_ids: 1628 env_info.open_db(db_id, *args, **kwargs) 1629 1630 env_info.env.sync(True) 1631 return True, None, None 1632 1633 @check_request 1634 def _on_drop_db(self, request: DbRequest, delete: bool = False) -> ServiceProcessingResponse: 1635 coro_id = self.current_caller_coro_info.coro_id 1636 db_id = request.db_id 1637 env_id = request.env_id 1638 1639 if env_id not in self.drop_db_requests: 1640 self.drop_db_requests[env_id] = dict() 1641 1642 drop_db_requests_env = self.drop_db_requests[env_id] 1643 1644 if db_id not in drop_db_requests_env: 1645 drop_db_requests_env[db_id] = (False, set()) 1646 1647 drop_db_requests_env_db: Tuple[bool, Set[CoroID]] = drop_db_requests_env[db_id] 1648 1649 if delete and (not drop_db_requests_env_db[0]): 1650 drop_db_requests_env_db = (True, drop_db_requests_env_db[1]) 1651 1652 drop_db_requests_env_db[1].add(coro_id) 1653 self.make_live() 1654 return False, None, None 1655 1656 def sync_in_thread_pool(self, env_id: EnvId = None): 1657 async def sync_db_coro(i: Interface, self: 'Db', env_id: EnvId, asyncio_loop, need_to_ensure_asyncio_loop: bool): 1658 if need_to_ensure_asyncio_loop: 1659 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None,CoroPriority.low, True)) 1660 else: 1661 if asyncio_loop is None: 1662 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().get()) 1663 1664 async def sync_db(self: 'Db', asyncio_loop, env: lmdb.Environment): 1665 def sync_worker(): 1666 env.sync(True) 1667 self.write_locked.discard(env_id) 1668 1669 await task_in_thread_pool(asyncio_loop, sync_worker) 1670 1671 env: lmdb.Environment = self.db_environments[env_id].env 1672 await i(AsyncioLoop, AsyncioLoopRequest().wait(sync_db(self, asyncio_loop, env))) 1673 self.write_locked_coro_id.discard(i.coro_id) 1674 self.envs_in_sync.discard(env_id) 1675 def make_service_live_for_a_next_sync(self: 'Db'): 1676 self.make_live() 1677 self.wake_up_handle_registered = False 1678 1679 if not self.wake_up_handle_registered: 1680 self.wake_up_handle_registered = True 1681 await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self) 1682 1683 asyncio_loop = None 1684 need_to_ensure_asyncio_loop = False 1685 try: 1686 asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get() 1687 except AsyncioLoopWasNotSetError: 1688 need_to_ensure_asyncio_loop = True 1689 1690 coro: CoroWrapperBase = put_root_from_other_service(self, sync_db_coro, self, env_id, asyncio_loop, need_to_ensure_asyncio_loop) 1691 coro.is_background_coro = True 1692 self.write_locked.add(env_id) 1693 self.write_locked_coro_id.add(coro.coro_id) 1694 self.envs_in_sync.add(env_id) 1695 1696 1697DbRequest.default_service_type = Db 1698 1699 1700def lmdb_reapplier(env_info: EnvInfo, handler: Callable, *args, **kwargs): 1701 environment: lmdb.Environment = env_info.env 1702 failed = True 1703 while failed: 1704 need_to_resize: bool = False 1705 try: 1706 handler(env_info, *args, **kwargs) 1707 failed = False 1708 except DBError as err: 1709 if isinstance(err.original_exception, lmdb.MapFullError): 1710 need_to_resize = True 1711 1712 if need_to_resize: 1713 environment.set_mapsize(environment.info()['map_size'] + 2 * 1024**2)
default_env_path_and_params: cengal.code_flow_control.args_manager.versions.v_0.args_manager.ArgsKwargs =
<cengal.code_flow_control.args_manager.versions.v_0.args_manager.ArgsKwargs object>
class
Db(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin):
503class Db(Service, EntityStatsMixin): 504 def __init__(self, loop: CoroSchedulerType): 505 super(Db, self).__init__(loop) 506 self.default_env_name: str = '__default__.dbenv' 507 self.default_envs_dir: str = 'db_envs' 508 # self.drop_db_requests: Dict[CoroID, Tuple[Hashable, bool]] = dict() 509 # self.drop_db_requests: Dict[EnvId, Dict[CoroID, Tuple[DbId, bool]]] = dict() 510 self.drop_db_requests: Dict[EnvId, Dict[DbId, Tuple[bool, Set[CoroID]]]] = dict() 511 # self.read_queue: List[Tuple[CoroID, Tuple[RawKeyType, DbId]]] = list() 512 self.read_queue: Dict[EnvId, Dict[DbId, Dict[RawKeyType, Set[CoroID]]]] = dict() 513 self.massive_read_queue: Dict[EnvId, Dict[CoroID, Dict[DbId, List[RawKeyType]]]] = dict() 514 # self.massive_read_queue: List[Tuple[CoroID, Set[Tuple[KeyType, DbId]]]] = list() 515 self.put_queue: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]] = dict() 516 self.data_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]] = dict() 517 # self.data_cache: Dict[EnvId, Dict[Tuple[RawKeyType, DbId], RawValueType]] = dict() 518 self.max_data_cache_size: Union[None, int] = 10000 519 self.deletion_cache: Dict[EnvId, Dict[DbId, Set[RawKeyType]]] = dict() 520 self.kv_deletion_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[RawValueType]]]] = dict() 521 # self.kv_deletion_cache: Dict[Tuple[Hashable, Hashable], Any] = dict() 522 self.get_first_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict() 523 self.get_last_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict() 524 self.get_n_items_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, int, bool]]] = dict() 525 self.get_items_range_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, RawKeyType, int, bool]]] = dict() 526 # self.get_all_items_queue: List[Tuple[CoroID, DbId, EnvId]] = list() 527 self.get_all_items_queue: Dict[EnvId, List[Tuple[CoroID, DbId]]] = dict() 528 self.open_db_environment_requests: Dict[CoroID, Tuple[EnvId, Union[None, str], bool, Tuple, Dict]] = dict() 529 self.root_path_to_db_environments_rel: RelativePath = None 530 self.app_name_waiter: CoroWrapperBase = None 531 self.default_db_environment: lmdb.Environment = None 532 self.db_environments: Dict[EnvId, EnvInfo] = dict() 533 # self.databases: Dict[Hashable, Any] = dict() 534 # self.db_names: Dict[DbId, DbName] = dict() 535 self.async_loop = None 536 self.sync_time_interval = 1.0 537 self.last_sync_time = perf_counter() 538 self.envs_need_to_be_sync: Set[DbId] = set() 539 self.envs_in_sync: Set[DbId] = set() 540 self.force_sync: Set[EnvId] = set() 541 self.sync_an_each_write: bool = False 542 self.write_locked: Set[EnvId] = set() 543 self.writes_num: int = 0 544 self.reads_num: int = 0 545 self.deletes_num: int = 0 546 self.db_drops_num: int = 0 547 self.write_locked_coro_id: Set[CoroID] = set() 548 self.wake_up_handle_registered: bool = False 549 # self.serializer = best_serializer_for_standard_data(( 550 # DataFormats.binary, 551 # DataFormats.messagepack, 552 # Tags.can_use_bytes, 553 # Tags.decode_str_as_str, 554 # Tags.decode_list_as_list, 555 # Tags.decode_bytes_as_bytes, 556 # Tags.superficial, 557 # Tags.current_platform, 558 # Tags.multi_platform, 559 # ), 560 # TestDataType.small, 561 # 0.1) 562 self.serializer = Serializer(Serializers.msgspec_messagepack) 563 self._request_workers = { 564 0: self._on_set_root_path_to_db_environments, 565 1: self._on_open_databases, 566 2: self._on_drop_db, 567 3: self._on_sync, 568 4: self._on_get, 569 5: self._on_get_items, 570 6: self._on_get_all_items, 571 7: self._on_put, 572 8: self._on_put_items, 573 9: self._on_delete, 574 10: self._on_delete_kv, 575 11: self._on_delete_items, 576 12: self._on_delete_kv_items, 577 13: self._on_open_db_environment, 578 14: self._on_get_first, 579 15: self._on_get_last, 580 16: self._on_get_n_items, 581 17: self._on_get_items_range, 582 18: self._on_close_db_environment, 583 } 584 585 # TODO: sync with last implementation 586 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 587 return type(self).__name__, { 588 'db_env_ids': list(self.db_environments.keys()), 589 'writes num': self.writes_num, 590 'reads num': self.reads_num, 591 'deletes num': self.deletes_num, 592 'db drops num': self.db_drops_num, 593 } 594 595 def norm_key(self, key: InputKeyType) -> NormalizedKeyType: 596 return normalize_compound_key(key) 597 598 def raw_key(self, env_or_id: Union[lmdb.Environment, EnvId], key: InputKeyType) -> RawKeyType: 599 raw_key: bytes = self.serializer.dumps(normalize_compound_key(key)) 600 if isinstance(env_or_id, lmdb.Environment): 601 env = env_or_id 602 else: 603 env = self.db_environments[env_or_id].env 604 605 if len(raw_key) > env.max_key_size(): 606 raise RawKeyIsTooLargeError(f'Raw form ({raw_key=}) of the key ({key=}) is too large: {len(raw_key)} > {env.max_key_size()}') 607 608 def destroy(self): 609 # TODO: we need to use some loop destroy service in order to put our coro which will write all pending queues, 610 # sync envirounments and close them. Also we need to prevent new requests from being processed. 611 db_environments_values = self.db_environments.values() 612 self.db_environments = type(self.db_environments)() 613 self.default_db_environment = None 614 for env_info in db_environments_values: 615 env_info.close() 616 617 def single_task_registration_or_immediate_processing( 618 self, *args, **kwargs) -> ServiceProcessingResponse: 619 result = self.try_resolve_request(*args, **kwargs) 620 if result is None: 621 self._ensure_default_db_environment() 622 return True, None, None 623 else: 624 return result 625 626 def _ensure_default_db_environment(self) -> bool: 627 if self.default_db_environment is None: 628 if self.root_path_to_db_environments_rel is None: 629 if self.app_name_waiter is None: 630 async def coro(i: Interface, self: 'Db'): 631 app_name_for_fs = await i(InstanceRequest().wait('app_name_for_fs')) 632 app_data_dir_path_type: AppDirectoryType = await i(InstanceRequest().wait('app_data_dir_path_type')) 633 app_dir_path: AppDirPath = await i(InstanceRequest().wait(AppDirPath)) 634 app_data_dir_path: str = app_dir_path.cached(app_data_dir_path_type, app_name_for_fs) 635 self.root_path_to_db_environments_rel = RelativePath(RelativePath(app_data_dir_path)(self.default_envs_dir)) 636 self._init_default_db_env() 637 self.app_name_waiter = None 638 self.make_live() 639 640 self.app_name_waiter = put_root_from_other_service(self, coro, self) 641 # self.app_name_waiter.is_background_coro = True 642 643 self.make_dead() 644 return False 645 else: 646 self._init_default_db_env() 647 return True 648 else: 649 return True 650 651 def full_processing_iteration(self): 652 # TODO: combine all queues into one single transaction by env_id. Or at most two transactions: read and write. 653 # This will improve performance and will give ability to move transations to other threads or even processess if needed. 654 655 # TODO: since DB can not handle big number of transactions per secons, it is better to combine all requests and process 656 # them at most as once a milisecond (it is frequently engough). 657 658 if not self._ensure_default_db_environment(): 659 return 660 661 if self.force_sync: 662 self.envs_need_to_be_sync |= self.force_sync 663 self.force_sync = set() 664 665 put_queue_buff = self.put_queue 666 self.put_queue = type(put_queue_buff)() 667 668 data_cache_buff = self.data_cache # will be cleared at the end of the iteration if necessary 669 670 read_queue_buff = self.read_queue 671 self.read_queue = type(read_queue_buff)() 672 673 massive_read_queue_buff = self.massive_read_queue 674 self.massive_read_queue = type(massive_read_queue_buff)() 675 676 deletion_cache_buff = self.deletion_cache 677 self.deletion_cache = type(deletion_cache_buff)() 678 679 kv_deletion_cache_buff = self.kv_deletion_cache 680 self.kv_deletion_cache = type(kv_deletion_cache_buff)() 681 682 get_all_items_queue_buff = self.get_all_items_queue 683 self.get_all_items_queue = type(get_all_items_queue_buff)() 684 685 get_first_queue_buff = self.get_first_queue 686 self.get_first_queue = type(get_first_queue_buff)() 687 688 get_last_queue_buff = self.get_last_queue 689 self.get_last_queue = type(get_last_queue_buff)() 690 691 get_n_items_queue_buff = self.get_n_items_queue 692 self.get_n_items_queue = type(get_n_items_queue_buff)() 693 694 get_items_range_queue_buff = self.get_items_range_queue 695 self.get_items_range_queue = type(get_items_range_queue_buff)() 696 697 open_db_environment_requests_buff: Dict[CoroID, Tuple[EnvId, Union[None, str], bool, Tuple, Dict]] = self.open_db_environment_requests 698 self.open_db_environment_requests = type(open_db_environment_requests_buff)() 699 700 # open_db_environment 701 for coro_id, request_info in open_db_environment_requests_buff.items(): 702 env_id, env_path, can_be_written_externally, args, kwargs = request_info 703 if env_id in self.db_environments: 704 self.register_response(coro_id, self.db_environments[env_id]) 705 else: 706 exception = None 707 try: 708 result = self._init_db_env(env_id, env_path, can_be_written_externally, *args, **kwargs) 709 except: 710 exception = get_exception() 711 712 self.register_response(coro_id, result, exception) 713 714 # put 715 def put_handler(env_info: EnvInfo, put_info: Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]): 716 try: 717 with env_info.env.begin(write=True) as txn: 718 for db_id, db_put_info in put_info.items(): 719 if db_id in env_info.databases: 720 for raw_key, values in db_put_info.items(): 721 for value in values: 722 if isinstance(value, RawValueInfo): 723 value, dupdata, overwrite, append = value 724 else: 725 dupdata, overwrite, append = True, True, False 726 727 txn.put(raw_key, value, db=env_info.databases[db_id], dupdata=dupdata, overwrite=overwrite, append=append) 728 729 self.writes_num += len(db_put_info) 730 except lmdb.MapFullError: 731 raise DBError.from_exception(db_id) 732 733 for env_id, put_info in put_queue_buff.items(): 734 if env_id in self.db_environments: 735 self.envs_need_to_be_sync.add(env_id) 736 lmdb_reapplier(self.db_environments[env_id], put_handler, put_info) 737 738 # TODO: implement replace* methods processing 739 740 # delete 741 for env_id, deletion_cache_buff_db_info in deletion_cache_buff.items(): 742 if env_id in self.db_environments: 743 env_info = self.db_environments[env_id] 744 self.envs_need_to_be_sync.add(env_id) 745 with env_info.env.begin(write=True) as txn: 746 for db_id, del_keys in deletion_cache_buff_db_info.items(): 747 if db_id in env_info.databases: 748 for del_raw_key in del_keys: 749 txn.delete(del_raw_key, db=env_info.databases[db_id]) 750 self.deletes_num += 1 751 try: 752 data_cache_buff[env_id][db_id].pop(del_raw_key, None) 753 except KeyError: 754 pass 755 756 # delete_kv 757 for env_id, kv_deletion_cache_buff_db_info in kv_deletion_cache_buff.items(): 758 if env_id in self.db_environments: 759 env_info = self.db_environments[env_id] 760 self.envs_need_to_be_sync.add(env_id) 761 with env_info.env.begin(write=True) as txn: 762 for db_id, del_keys in kv_deletion_cache_buff_db_info.items(): 763 if db_id in env_info.databases: 764 for del_raw_key, del_raw_values in del_keys.items(): 765 for del_raw_value in del_raw_values: 766 txn.delete(del_raw_key, del_raw_value, db=env_info.databases[db_id]) 767 self.deletes_num += 1 768 try: 769 raw_values: List[Union[RawValueType, RawValueInfo]] = data_cache_buff[env_id][db_id][del_raw_key] 770 new_raw_values: List[Union[RawValueType, RawValueInfo]] = list() 771 for raw_value in raw_values: 772 raw_value_original = raw_value 773 if isinstance(raw_value, RawValueInfo): 774 raw_value, _, _, _ = raw_value 775 776 if raw_value != del_raw_value: 777 new_raw_values.append(raw_value_original) 778 779 if new_raw_values: 780 data_cache_buff[env_id][db_id][del_raw_key] = new_raw_values 781 else: 782 data_cache_buff[env_id][db_id].pop(del_raw_key, None) 783 except KeyError: 784 pass 785 786 # drop 787 drop_db_requests_buff = self.drop_db_requests 788 self.drop_db_requests = type(drop_db_requests_buff)() 789 790 def drop_handler(env_info: EnvInfo, drop_info: Dict[DbId, Tuple[bool, Set[CoroID]]]): 791 for db_id, db_drop_info in drop_info.items(): 792 delete_db, coro_id = db_drop_info 793 if db_id in env_info.databases: 794 try: 795 with env_info.env.begin(write=True) as txn: 796 txn.drop(db=env_info.databases[db_id], delete=delete_db) 797 if delete_db: 798 del env_info.databases[db_id] 799 del env_info.db_names[db_id] 800 801 self.db_drops_num += 1 802 except lmdb.MapFullError: 803 raise DBError.from_exception(db_id) 804 805 self.register_response(coro_id, None, UnknownEnvError(env_id)) 806 else: 807 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 808 809 for env_id, drop_info in drop_db_requests_buff.items(): 810 if env_id in self.db_environments: 811 self.envs_need_to_be_sync.add(env_id) 812 lmdb_reapplier(self.db_environments[env_id], drop_handler, drop_info) 813 else: 814 for db_id, db_drop_info in drop_info.items(): 815 delete_db, coro_id = db_drop_info 816 self.register_response(coro_id, None, UnknownEnvError(env_id)) 817 818 # get 819 def get_item(txn, key_info: Tuple[RawKeyType, DbId, EnvId], data_cache_buff: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]]) -> Tuple[ValueType, Optional[Exception]]: 820 key, db_id, env_id = key_info 821 need_to_get_from_db = True 822 try: 823 values = data_cache_buff[env_id][db_id][key] 824 if values: 825 value = values[0] 826 if isinstance(value, RawValueInfo): 827 value, _, _, _ = value 828 829 need_to_get_from_db = False 830 except KeyError: 831 pass 832 833 if need_to_get_from_db: 834 value = txn.get(key, db=self.db_environments[env_id].databases[db_id]) 835 self.reads_num += 1 836 837 exception = None 838 try: 839 if value is None: 840 exception = DbKeyError(key_info) 841 else: 842 value = self.serializer.loads(value) 843 except: 844 exception = get_exception() 845 846 return value, exception 847 848 # _on_get 849 for env_id, read_queue_buff_db_info in read_queue_buff.items(): 850 if env_id in self.db_environments: 851 env_info = self.db_environments[env_id] 852 for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items(): 853 if db_id in env_info.databases: 854 with env_info.env.begin() as txn: 855 for raw_key, coro_ids in read_queue_buff_db_key_info.items(): 856 value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff) 857 for coro_id in coro_ids: 858 self.register_response(coro_id, value, exception) 859 else: 860 for raw_key, coro_ids in read_queue_buff_db_key_info.items(): 861 for coro_id in coro_ids: 862 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 863 else: 864 for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items(): 865 for raw_key, coro_ids in read_queue_buff_db_key_info.items(): 866 for coro_id in coro_ids: 867 self.register_response(coro_id, None, UnknownEnvError(env_id)) 868 869 # _on_get_items 870 results: Dict[CoroID, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]]] = dict() 871 for env_id, massive_read_queue_buff_coro_info in massive_read_queue_buff.items(): 872 if env_id in self.db_environments: 873 env_info = self.db_environments[env_id] 874 for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items(): 875 if coro_id not in results: 876 results[coro_id] = dict() 877 878 coro_results = results[coro_id] 879 for db_id, raw_keys in read_queue_buff_db_info.items(): 880 if db_id not in coro_results: 881 coro_results[db_id] = OrderedDict() 882 883 coro_db_results = coro_results[db_id] 884 if db_id in env_info.databases: 885 with env_info.env.begin() as txn: 886 for raw_key in raw_keys: 887 value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff) 888 coro_db_results[normalize_compound_key(raw_key)] = (value, exception) 889 else: 890 for coro_id in coro_ids: 891 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 892 else: 893 for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items(): 894 for db_id, raw_keys in read_queue_buff_db_info.items(): 895 for coro_id in coro_ids: 896 self.register_response(coro_id, None, UnknownEnvError(env_id)) 897 898 for coro_id, coro_results in results.items(): 899 if coro_results: 900 db_id = tuple(coro_results.keys())[0] 901 self.register_response(coro_id, coro_results[db_id], None) 902 else: 903 self.register_response(coro_id, OrderedDict(), None) 904 905 # get all items 906 for env_id, requests_info in get_all_items_queue_buff.items(): 907 for request_info in requests_info: 908 coro_id, db_id = request_info 909 if env_id in self.db_environments: 910 env_info = self.db_environments[env_id] 911 env = env_info.env 912 if db_id in env_info.databases: 913 db = env_info.databases[db_id] 914 with env.begin(db=db) as txn: 915 result = list() 916 exception = None 917 try: 918 result = [(normalize_compound_key(self.serializer.loads(k)), self.serializer.loads(v)) for k, v in txn.cursor() if not k.startswith(EnvInfo.db_name_prefix_bytes)] 919 self.reads_num += len(result) 920 except: 921 exception = get_exception() 922 923 self.register_response(coro_id, result, exception) 924 else: 925 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 926 else: 927 self.register_response(coro_id, None, UnknownEnvError(env_id)) 928 929 # get_first 930 for env_id, get_first_queue_buff_db_info in get_first_queue_buff.items(): 931 if env_id in self.db_environments: 932 env_info = self.db_environments[env_id] 933 for db_id, coro_ids in get_first_queue_buff_db_info.items(): 934 if db_id in env_info.databases: 935 db = env_info.databases[db_id] 936 with env_info.env.begin(db=db) as txn: 937 result = None 938 exception = None 939 try: 940 cursor: lmdb.Cursor = txn.cursor() 941 if cursor.first(): 942 key = cursor.key() 943 key_found: bool = True 944 while key.startswith(EnvInfo.db_name_prefix_bytes): 945 key_found = cursor.next_nodup() 946 if not key_found: 947 break 948 949 key = cursor.key() 950 951 if key_found: 952 value = cursor.value() 953 result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value())) 954 self.reads_num += 1 955 else: 956 exception = KeyError() 957 else: 958 exception = KeyError() 959 except: 960 exception = get_exception() 961 962 for coro_id in coro_ids: 963 self.register_response(coro_id, result, exception) 964 else: 965 for coro_id in coro_ids: 966 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 967 else: 968 for db_id, coro_ids in get_first_queue_buff_db_info.items(): 969 for coro_id in coro_ids: 970 self.register_response(coro_id, None, UnknownEnvError(env_id)) 971 972 # get_last 973 for env_id, get_last_queue_buff_db_info in get_last_queue_buff.items(): 974 if env_id in self.db_environments: 975 env_info = self.db_environments[env_id] 976 for db_id, coro_ids in get_last_queue_buff_db_info.items(): 977 if db_id in env_info.databases: 978 db = env_info.databases[db_id] 979 with env_info.env.begin(db=db) as txn: 980 result = None 981 exception = None 982 try: 983 cursor: lmdb.Cursor = txn.cursor() 984 if cursor.last(): 985 key: bytes = cursor.key() 986 key_found: bool = True 987 key = cursor.key() 988 while key.startswith(EnvInfo.db_name_prefix_bytes): 989 key_found = cursor.prev_nodup() 990 if not key_found: 991 break 992 993 key = cursor.key() 994 995 if key_found: 996 value: bytes = cursor.value() 997 result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value())) 998 self.reads_num += 1 999 else: 1000 exception = KeyError() 1001 else: 1002 exception = KeyError() 1003 except: 1004 exception = get_exception() 1005 1006 for coro_id in coro_ids: 1007 self.register_response(coro_id, result, exception) 1008 else: 1009 for coro_id in coro_ids: 1010 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 1011 else: 1012 for db_id, coro_ids in get_last_queue_buff_db_info.items(): 1013 for coro_id in coro_ids: 1014 self.register_response(coro_id, None, UnknownEnvError(env_id)) 1015 1016 # get_n_items 1017 for env_id, get_n_items_queue_buff_coro_info in get_n_items_queue_buff.items(): 1018 if env_id in self.db_environments: 1019 env_info = self.db_environments[env_id] 1020 for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items(): 1021 coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list() 1022 db_id, first_desired_raw_key, num_items, reverse = read_queue_buff_db_info 1023 if db_id in env_info.databases: 1024 db = env_info.databases[db_id] 1025 with env_info.env.begin(db=db) as txn: 1026 exception = None 1027 try: 1028 cursor: lmdb.Cursor = txn.cursor() 1029 if cursor.set_range(first_desired_raw_key): 1030 if reverse: 1031 cursor_iterator = cursor.iterprev(keys=True, values=True) 1032 else: 1033 cursor_iterator = cursor.iternext(keys=True, values=True) 1034 1035 for raw_key, raw_value in cursor_iterator: 1036 if raw_key.startswith(EnvInfo.db_name_prefix_bytes): 1037 continue 1038 1039 if (num_items is not None) and (num_items <= 0): 1040 break 1041 1042 coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value))) 1043 self.reads_num += 1 1044 if num_items is not None: 1045 num_items -= 1 1046 else: 1047 exception = KeyError() 1048 except: 1049 exception = get_exception() 1050 1051 self.register_response(coro_id, coro_results, exception) 1052 else: 1053 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 1054 else: 1055 for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items(): 1056 for coro_id in coro_ids: 1057 self.register_response(coro_id, None, UnknownEnvError(env_id)) 1058 1059 # get_items_range 1060 for env_id, get_items_range_queue_buff_coro_info in get_items_range_queue_buff.items(): 1061 if env_id in self.db_environments: 1062 env_info = self.db_environments[env_id] 1063 for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items(): 1064 coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list() 1065 db_id, first_desired_raw_key, last_desired_raw_key, num_items, reverse = read_queue_buff_db_info 1066 if db_id in env_info.databases: 1067 db = env_info.databases[db_id] 1068 with env_info.env.begin(db=db) as txn: 1069 exception = None 1070 try: 1071 cursor: lmdb.Cursor = txn.cursor() 1072 if cursor.set_range(first_desired_raw_key): 1073 if reverse: 1074 cursor_iterator = cursor.iterprev(keys=True, values=True) 1075 else: 1076 cursor_iterator = cursor.iternext(keys=True, values=True) 1077 1078 for raw_key, raw_value in cursor_iterator: 1079 if raw_key.startswith(EnvInfo.db_name_prefix_bytes): 1080 continue 1081 1082 if reverse: 1083 if raw_key < last_desired_raw_key: 1084 break 1085 else: 1086 if raw_key > last_desired_raw_key: 1087 break 1088 1089 if (num_items is not None) and (num_items <= 0): 1090 break 1091 1092 coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value))) 1093 self.reads_num += 1 1094 if num_items is not None: 1095 num_items -= 1 1096 else: 1097 exception = KeyError() 1098 except: 1099 exception = get_exception() 1100 1101 self.register_response(coro_id, coro_results, exception) 1102 else: 1103 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 1104 else: 1105 for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items(): 1106 for coro_id in coro_ids: 1107 self.register_response(coro_id, None, UnknownEnvError(env_id)) 1108 1109 need_to_sync = self.sync_an_each_write 1110 1111 # periodic sync 1112 if (perf_counter() - self.last_sync_time) >= self.sync_time_interval: 1113 for env_id, env_info in self.db_environments.items(): 1114 self.envs_need_to_be_sync.add(env_id) 1115 need_to_sync = True 1116 1117 # sync 1118 if need_to_sync and self.envs_need_to_be_sync: 1119 envs_need_to_be_sync_bak = self.envs_need_to_be_sync - self.envs_in_sync 1120 self.envs_need_to_be_sync = set(self.envs_in_sync) 1121 for env_id in envs_need_to_be_sync_bak: 1122 self.sync_in_thread_pool(env_id) 1123 1124 self.last_sync_time = perf_counter() 1125 1126 # invalidate data_cache for envs that can be written externally 1127 for env_id, env_info in self.db_environments.items(): 1128 if env_info.init_info.can_be_written_externally: 1129 self.data_cache.pop(env_id, None) 1130 1131 # clear too big caches 1132 for env_id, env_info in self.db_environments.items(): 1133 if env_id in self.data_cache: 1134 if len(self.data_cache[env_id]) > self.max_data_cache_size: 1135 del self.data_cache[env_id] 1136 1137 self.make_dead() 1138 1139 def in_work(self) -> bool: 1140 result: bool = bool(self.default_db_environment is None) \ 1141 or bool(self.open_db_environment_requests) \ 1142 or bool(self.get_first_queue) \ 1143 or bool(self.get_last_queue) \ 1144 or bool(self.get_n_items_queue) \ 1145 or bool(self.get_items_range_queue) \ 1146 or bool(self.read_queue) \ 1147 or bool(self.massive_read_queue) \ 1148 or bool(self.get_all_items_queue)\ 1149 or bool(self.force_sync) \ 1150 or bool(self.deletion_cache) \ 1151 or bool(self.kv_deletion_cache) \ 1152 or bool(self.drop_db_requests) \ 1153 or bool(self.put_queue) \ 1154 or bool(self.envs_need_to_be_sync) \ 1155 or ((perf_counter() - self.last_sync_time) >= self.sync_time_interval) 1156 return self.thrifty_in_work(result) 1157 1158 def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]: 1159 time_since_last_sync_time: float = perf_counter() - self.last_sync_time 1160 if self.sync_time_interval > time_since_last_sync_time: 1161 return True, self.sync_time_interval - time_since_last_sync_time 1162 else: 1163 return True, 0 1164 1165 def _init_db_env(self, env_id: EnvId, env_path: Union[None, str], can_be_written_externally: bool, *args, **kwargs) -> EnvInfo: 1166 if env_id in self.db_environments: 1167 return self.db_environments[env_id] 1168 1169 if env_path is None: 1170 if env_id is None: 1171 env_path = self.root_path_to_db_environments_rel(self.default_env_name) 1172 else: 1173 env_path = self.root_path_to_db_environments_rel(to_identifier(f'{env_id}')) 1174 else: 1175 if os.path.isabs(env_path): 1176 env_path = os.path.normpath(env_path) 1177 else: 1178 env_path = self.root_path_to_db_environments_rel(env_path) 1179 1180 env_init_info: EnvInitInfo = EnvInitInfo( 1181 env_id, 1182 env_path, 1183 can_be_written_externally, 1184 *args, **kwargs 1185 ) 1186 env_info: EnvInfo = EnvInfo(env_init_info) 1187 self.db_environments[env_info.env_id] = env_info 1188 env_info.env.sync(True) 1189 return env_info 1190 1191 def _init_default_db_env(self): 1192 args, kwargs = default_env_path_and_params() 1193 env_info: EnvInfo = self._init_db_env(None, *args, **kwargs) 1194 self.default_db_environment = env_info 1195 1196 def _on_open_db_environment(self, request: DbRequest, env_id: EnvId, env_path: Union[None, str], can_be_written_externally: bool, *args, **kwargs) -> Tuple[bool, EnvInfo, Exception]: 1197 self.open_db_environment_requests[self.current_caller_coro_info.coro_id] = (env_id, env_path, can_be_written_externally, args, kwargs) 1198 self.make_live() 1199 return False, None, None 1200 1201 @check_request 1202 def _on_close_db_environment(self, request: DbRequest): 1203 env_id: EnvId = request.env_id 1204 1205 if env_id is None: 1206 return True, False, DefaultDbEnvironmentCanNotBeClosedManualyError() 1207 1208 if env_id in self.db_environments: 1209 self.db_environments[env_id].close() 1210 del self.db_environments[env_id] 1211 return True, True, None 1212 else: 1213 return True, False, UnknownEnvError(env_id) 1214 1215 def _on_set_root_path_to_db_environments(self, request: DbRequest, root_path_to_db_environments: str) -> ServiceProcessingResponse: 1216 self.root_path_to_db_environments_rel = RelativePath(root_path_to_db_environments) 1217 return True, True, None 1218 1219 @check_request 1220 def _on_sync(self, request: DbRequest) -> ServiceProcessingResponse: 1221 if self.put_queue: 1222 self.force_sync.add(request.env_id) 1223 self.make_live() 1224 else: 1225 # self.default_db_environment.sync(True) 1226 self.sync_in_thread_pool(request.env_id) 1227 1228 return True, None, None 1229 1230 @check_request 1231 def _on_get(self, request: DbRequest, key: KeyType) -> ServiceProcessingResponse: 1232 coro_id = self.current_caller_coro_info.coro_id 1233 key = self.serializer.dumps(normalize_compound_key(key)) 1234 db_id = request.db_id 1235 env_id = request.env_id 1236 1237 if env_id in self.data_cache: 1238 env_cache = self.data_cache[env_id] 1239 if db_id in env_cache: 1240 db_cache = env_cache[db_id] 1241 if key in db_cache: 1242 values = db_cache[key] 1243 if values: 1244 value = values[0] 1245 if isinstance(value, RawValueInfo): 1246 value, _, _, _ = value 1247 1248 value = self.serializer.loads(value) 1249 return True, value, None 1250 1251 if env_id not in self.read_queue: 1252 self.read_queue[env_id] = dict() 1253 1254 read_queue_env = self.read_queue[env_id] 1255 if db_id not in read_queue_env: 1256 read_queue_env[db_id] = dict() 1257 1258 read_queue_env_db = read_queue_env[db_id] 1259 if key not in read_queue_env_db: 1260 read_queue_env_db[key] = set() 1261 1262 read_queue_env_db_key = read_queue_env_db[key] 1263 read_queue_env_db_key.add(coro_id) 1264 self.make_live() 1265 return False, None, None 1266 1267 @check_request 1268 def _on_get_first(self, request: DbRequest) -> ServiceProcessingResponse: 1269 coro_id = self.current_caller_coro_info.coro_id 1270 db_id = request.db_id 1271 env_id = request.env_id 1272 1273 if env_id not in self.get_first_queue: 1274 self.get_first_queue[env_id] = dict() 1275 1276 get_first_queue_env = self.get_first_queue[env_id] 1277 if db_id not in get_first_queue_env: 1278 get_first_queue_env[db_id] = set() 1279 1280 get_first_queue_env_db = get_first_queue_env[db_id] 1281 get_first_queue_env_db.add(coro_id) 1282 1283 self.make_live() 1284 return False, None, None 1285 1286 @check_request 1287 def _on_get_last(self, request: DbRequest) -> ServiceProcessingResponse: 1288 coro_id = self.current_caller_coro_info.coro_id 1289 db_id = request.db_id 1290 env_id = request.env_id 1291 1292 if env_id not in self.get_last_queue: 1293 self.get_last_queue[env_id] = dict() 1294 1295 get_last_queue_env = self.get_last_queue[env_id] 1296 if db_id not in get_last_queue_env: 1297 get_last_queue_env[db_id] = set() 1298 1299 get_last_queue_env_db = get_last_queue_env[db_id] 1300 get_last_queue_env_db.add(coro_id) 1301 1302 self.make_live() 1303 return False, None, None 1304 1305 @check_request 1306 def _on_get_n_items(self, request: DbRequest, desired_key: InputKeyType, num: Optional[int] = None, reverse: bool = False) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]: 1307 coro_id = self.current_caller_coro_info.coro_id 1308 desired_key = self.serializer.dumps(normalize_compound_key(desired_key)) 1309 db_id = request.db_id 1310 env_id = request.env_id 1311 1312 if env_id not in self.get_n_items_queue: 1313 self.get_n_items_queue[env_id] = dict() 1314 1315 get_n_items_queue_env = self.get_n_items_queue[env_id] 1316 get_n_items_queue_env[coro_id] = (db_id, desired_key, num, reverse) 1317 self.make_live() 1318 return False, None, None 1319 1320 @check_request 1321 def _on_get_items_range(self, request: DbRequest, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None, reverse: bool = False) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]: 1322 coro_id = self.current_caller_coro_info.coro_id 1323 first_desired_key = self.serializer.dumps(normalize_compound_key(first_desired_key)) 1324 last_desired_key = self.serializer.dumps(normalize_compound_key(last_desired_key)) 1325 db_id = request.db_id 1326 env_id = request.env_id 1327 1328 if env_id not in self.get_items_range_queue: 1329 self.get_items_range_queue[env_id] = dict() 1330 1331 get_items_range_queue_env = self.get_items_range_queue[env_id] 1332 get_items_range_queue_env[coro_id] = (db_id, first_desired_key, last_desired_key, num, reverse) 1333 self.make_live() 1334 return False, None, None 1335 1336 @check_request 1337 def _on_get_items(self, request: DbRequest, db_keys: Sequence[InputKeyType]) -> Tuple[bool, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]], Optional[Exception]]: 1338 coro_id = self.current_caller_coro_info.coro_id 1339 db_id = request.db_id 1340 env_id = request.env_id 1341 1342 if env_id not in self.massive_read_queue: 1343 self.massive_read_queue[env_id] = dict() 1344 1345 massive_read_queue_env = self.massive_read_queue[env_id] 1346 if coro_id not in massive_read_queue_env: 1347 massive_read_queue_env[coro_id] = dict() 1348 1349 massive_read_queue_env_coro = massive_read_queue_env[coro_id] 1350 if db_id not in massive_read_queue_env_coro: 1351 massive_read_queue_env_coro[db_id] = list() 1352 1353 massive_read_queue_env_coro_db = massive_read_queue_env_coro[db_id] 1354 for key in db_keys: 1355 massive_read_queue_env_coro_db.append(self.serializer.dumps(normalize_compound_key(key))) 1356 1357 self.make_live() 1358 return False, None, None 1359 1360 @check_request 1361 def _on_get_all_items(self, request: DbRequest) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]: 1362 env_id = request.env_id 1363 db_id = request.db_id 1364 coro_id = self.current_caller_coro_info.coro_id 1365 if env_id not in self.get_all_items_queue: 1366 self.get_all_items_queue[env_id] = list() 1367 1368 self.get_all_items_queue[env_id].append((coro_id, db_id)) 1369 self.make_live() 1370 return False, None, None 1371 1372 @check_request 1373 def _on_put(self, request: DbRequest, key: KeyType, value: Union[ValueType, ValueInfo]) -> Tuple[bool, RawValueType, Optional[Exception]]: 1374 key = self.serializer.dumps(normalize_compound_key(key)) 1375 db_id = request.db_id 1376 env_id = request.env_id 1377 1378 exception = None 1379 result = None 1380 try: 1381 # self.put_queue 1382 if env_id not in self.put_queue: 1383 self.put_queue[env_id] = dict() 1384 1385 env_put_queue = self.put_queue[env_id] 1386 if db_id not in env_put_queue: 1387 env_put_queue[db_id] = dict() 1388 1389 env_put_queue_db = env_put_queue[db_id] 1390 if key not in env_put_queue_db: 1391 env_put_queue_db[key] = list() 1392 1393 # self.data_cache 1394 if env_id not in self.data_cache: 1395 self.data_cache[env_id] = dict() 1396 1397 env_data_cache = self.data_cache[env_id] 1398 if db_id not in env_data_cache: 1399 env_data_cache[db_id] = dict() 1400 1401 env_data_cache_db = env_data_cache[db_id] 1402 if key not in env_data_cache_db: 1403 env_data_cache_db[key] = list() 1404 1405 # both 1406 if isinstance(value, ValueInfo): 1407 result = RawValueInfo(self.serializer.dumps(value.value), value.dupdata, value.overwrite, value.append) 1408 if env_put_queue_db[key] and not value.dupdata: 1409 env_data_cache_db[key][0] = env_put_queue_db[key][0] = result 1410 else: 1411 env_put_queue_db[key].append(result) 1412 env_data_cache_db[key].append(result) 1413 else: 1414 result = self.serializer.dumps(value) 1415 if env_put_queue_db[key]: 1416 env_data_cache_db[key][0] = env_put_queue_db[key][0] = result 1417 else: 1418 env_put_queue_db[key].append(result) 1419 env_data_cache_db[key].append(result) 1420 except: 1421 exception = get_exception() 1422 1423 self.make_live() 1424 return True, result, exception 1425 1426 @check_request 1427 def _on_put_items(self, request: DbRequest, db_items: Dict[InputKeyType, Union[ValueType, ValueInfo]]) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]], Optional[Exception]]: 1428 result_items: Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]] = dict() 1429 db_id = request.db_id 1430 env_id = request.env_id 1431 1432 # self.put_queue 1433 if env_id not in self.put_queue: 1434 self.put_queue[env_id] = dict() 1435 1436 env_put_queue = self.put_queue[env_id] 1437 if db_id not in result_items: 1438 result_items[db_id] = dict() 1439 1440 result_db_items = result_items[db_id] 1441 1442 if db_id not in env_put_queue: 1443 env_put_queue[db_id] = dict() 1444 1445 env_put_queue_db = env_put_queue[db_id] 1446 1447 # self.data_cache 1448 if env_id not in self.data_cache: 1449 self.data_cache[env_id] = dict() 1450 1451 env_data_cache = self.data_cache[env_id] 1452 if db_id not in result_items: 1453 result_items[db_id] = dict() 1454 1455 result_db_items = result_items[db_id] 1456 1457 if db_id not in env_data_cache: 1458 env_data_cache[db_id] = dict() 1459 1460 env_data_cache_db = env_data_cache[db_id] 1461 1462 # both 1463 for key, value in db_items.items(): 1464 key = self.serializer.dumps(normalize_compound_key(key)) 1465 1466 exception = None 1467 result = None 1468 try: 1469 if key not in env_put_queue_db: 1470 env_put_queue_db[key] = list() 1471 1472 if key not in env_data_cache_db: 1473 env_data_cache_db[key] = list() 1474 1475 if isinstance(value, ValueInfo): 1476 result = env_data_cache_db[key][0] = env_put_queue_db[key][0] = RawValueInfo(self.serializer.dumps(value.value), value.dupdata, value.overwrite, value.append) 1477 if env_data_cache_db[key] and not value.dupdata: 1478 env_data_cache_db[key][0] = env_put_queue_db[key][0] = result 1479 else: 1480 env_put_queue_db[key].append(result) 1481 env_data_cache_db[key].append(result) 1482 else: 1483 result = self.serializer.dumps(value) 1484 if env_data_cache_db[key]: 1485 env_data_cache_db[key][0] = env_put_queue_db[key][0] = result 1486 else: 1487 env_put_queue_db[key].append(result) 1488 env_data_cache_db[key].append(result) 1489 except: 1490 exception = get_exception() 1491 1492 result_db_items[key] = (result, exception) 1493 1494 self.make_live() 1495 return True, result_items, None 1496 1497 @check_request 1498 def _on_delete(self, request: DbRequest, key: KeyType) -> Tuple[bool, None, Optional[Exception]]: 1499 key = self.serializer.dumps(normalize_compound_key(key)) 1500 if key.startswith(EnvInfo.db_name_prefix_bytes): 1501 return True, None, KeyError(f'Can not delete special key (db info key): {key}') 1502 1503 db_id = request.db_id 1504 env_id = request.env_id 1505 1506 exception = None 1507 result = None 1508 try: 1509 if env_id not in self.deletion_cache: 1510 self.deletion_cache[env_id] = dict() 1511 1512 env_deletion_cache = self.deletion_cache[env_id] 1513 if db_id not in env_deletion_cache: 1514 env_deletion_cache[db_id] = set() 1515 1516 env_deletion_cache_db = env_deletion_cache[db_id] 1517 env_deletion_cache_db.add(key) 1518 except: 1519 exception = get_exception() 1520 1521 self.make_live() 1522 return True, result, exception 1523 1524 @check_request 1525 def _on_delete_kv(self, request: DbRequest, key: InputKeyType, value: ValueType) -> Tuple[bool, RawValueType, Optional[Exception]]: 1526 raw_key: bytes = self.serializer.dumps(normalize_compound_key(key)) 1527 if raw_key.startswith(EnvInfo.db_name_prefix_bytes): 1528 return True, None, KeyError(f'Can not delete special raw_key (db info raw_key): {raw_key}') 1529 1530 db_id = request.db_id 1531 env_id = request.env_id 1532 1533 exception = None 1534 raw_value = None 1535 try: 1536 if env_id not in self.kv_deletion_cache: 1537 self.kv_deletion_cache[env_id] = dict() 1538 1539 env_kv_deletion_cache = self.kv_deletion_cache[env_id] 1540 if db_id not in env_kv_deletion_cache: 1541 env_kv_deletion_cache[db_id] = dict() 1542 1543 env_kv_deletion_cache_db = env_kv_deletion_cache[db_id] 1544 if raw_key not in env_kv_deletion_cache_db: 1545 env_kv_deletion_cache_db[raw_key] = list() 1546 1547 raw_value = self.serializer.dumps(value) 1548 env_kv_deletion_cache_db[raw_key].append(raw_value) 1549 except: 1550 exception = get_exception() 1551 1552 self.make_live() 1553 return True, (raw_key, raw_value), exception 1554 1555 @check_request 1556 def _on_delete_items(self, request: DbRequest, db_items: Set[InputKeyType]) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Optional[Exception]]], Optional[Exception]]: 1557 result_items: Dict[InputKeyType, Tuple[RawKeyType, Optional[Exception]]] = dict() 1558 db_id = request.db_id 1559 env_id = request.env_id 1560 1561 if env_id not in self.deletion_cache: 1562 self.deletion_cache[env_id] = dict() 1563 1564 env_deletion_cache = self.deletion_cache[env_id] 1565 if db_id not in env_deletion_cache: 1566 env_deletion_cache[db_id] = set() 1567 1568 env_deletion_cache_db = env_deletion_cache[db_id] 1569 for key in db_items: 1570 exception = None 1571 try: 1572 raw_key: bytes = self.serializer.dumps(normalize_compound_key(key)) 1573 if raw_key.startswith(EnvInfo.db_name_prefix_bytes): 1574 exception = KeyError(f'Can not delete special key (db info key): {raw_key}') 1575 else: 1576 env_deletion_cache_db.add(raw_key) 1577 except: 1578 exception = get_exception() 1579 1580 result_items[key] = (raw_key, exception) 1581 1582 self.make_live() 1583 return True, result_items, None 1584 1585 @check_request 1586 def _on_delete_kv_items(self, request: DbRequest, db_items: Dict[InputKeyType, Tuple[ValueType]]) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]], Optional[Exception]]: 1587 result_items: Dict[InputKeyType, Tuple[RawKeyType, RawValueType, Optional[Exception]]] = dict() 1588 db_id = request.db_id 1589 env_id = request.env_id 1590 1591 if env_id not in self.kv_deletion_cache: 1592 self.kv_deletion_cache[env_id] = dict() 1593 1594 env_kv_deletion_cache = self.kv_deletion_cache[env_id] 1595 if db_id not in env_kv_deletion_cache: 1596 env_kv_deletion_cache[db_id] = dict() 1597 1598 env_kv_deletion_cache_db = env_kv_deletion_cache[db_id] 1599 for key, value in db_items.items(): 1600 exception = None 1601 raw_key = None 1602 raw_value = None 1603 try: 1604 raw_key: bytes = self.serializer.dumps(normalize_compound_key(key)) 1605 if raw_key.startswith(EnvInfo.db_name_prefix_bytes): 1606 exception = KeyError(f'Can not delete special key (db info key): {raw_key}') 1607 else: 1608 if raw_key not in env_kv_deletion_cache_db: 1609 env_kv_deletion_cache_db[raw_key] = list() 1610 1611 raw_value = self.serializer.dumps(value) 1612 env_kv_deletion_cache_db[raw_key].append(self.serializer.dumps(value)) 1613 except: 1614 exception = get_exception() 1615 1616 result_items[key] = (raw_key, raw_value, exception) 1617 1618 self.make_live() 1619 return True, result_items, None 1620 1621 def _on_open_databases(self, request: DbRequest, db_ids: Set[DbId], *args, **kwargs) -> ServiceProcessingResponse: 1622 try: 1623 env_info: EnvInfo = self.db_environments[request.env_id] 1624 except KeyError: 1625 exception = UnknownEnvError(request.env_id) 1626 return True, None, exception 1627 1628 for db_id in db_ids: 1629 env_info.open_db(db_id, *args, **kwargs) 1630 1631 env_info.env.sync(True) 1632 return True, None, None 1633 1634 @check_request 1635 def _on_drop_db(self, request: DbRequest, delete: bool = False) -> ServiceProcessingResponse: 1636 coro_id = self.current_caller_coro_info.coro_id 1637 db_id = request.db_id 1638 env_id = request.env_id 1639 1640 if env_id not in self.drop_db_requests: 1641 self.drop_db_requests[env_id] = dict() 1642 1643 drop_db_requests_env = self.drop_db_requests[env_id] 1644 1645 if db_id not in drop_db_requests_env: 1646 drop_db_requests_env[db_id] = (False, set()) 1647 1648 drop_db_requests_env_db: Tuple[bool, Set[CoroID]] = drop_db_requests_env[db_id] 1649 1650 if delete and (not drop_db_requests_env_db[0]): 1651 drop_db_requests_env_db = (True, drop_db_requests_env_db[1]) 1652 1653 drop_db_requests_env_db[1].add(coro_id) 1654 self.make_live() 1655 return False, None, None 1656 1657 def sync_in_thread_pool(self, env_id: EnvId = None): 1658 async def sync_db_coro(i: Interface, self: 'Db', env_id: EnvId, asyncio_loop, need_to_ensure_asyncio_loop: bool): 1659 if need_to_ensure_asyncio_loop: 1660 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None,CoroPriority.low, True)) 1661 else: 1662 if asyncio_loop is None: 1663 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().get()) 1664 1665 async def sync_db(self: 'Db', asyncio_loop, env: lmdb.Environment): 1666 def sync_worker(): 1667 env.sync(True) 1668 self.write_locked.discard(env_id) 1669 1670 await task_in_thread_pool(asyncio_loop, sync_worker) 1671 1672 env: lmdb.Environment = self.db_environments[env_id].env 1673 await i(AsyncioLoop, AsyncioLoopRequest().wait(sync_db(self, asyncio_loop, env))) 1674 self.write_locked_coro_id.discard(i.coro_id) 1675 self.envs_in_sync.discard(env_id) 1676 def make_service_live_for_a_next_sync(self: 'Db'): 1677 self.make_live() 1678 self.wake_up_handle_registered = False 1679 1680 if not self.wake_up_handle_registered: 1681 self.wake_up_handle_registered = True 1682 await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self) 1683 1684 asyncio_loop = None 1685 need_to_ensure_asyncio_loop = False 1686 try: 1687 asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get() 1688 except AsyncioLoopWasNotSetError: 1689 need_to_ensure_asyncio_loop = True 1690 1691 coro: CoroWrapperBase = put_root_from_other_service(self, sync_db_coro, self, env_id, asyncio_loop, need_to_ensure_asyncio_loop) 1692 coro.is_background_coro = True 1693 self.write_locked.add(env_id) 1694 self.write_locked_coro_id.add(coro.coro_id) 1695 self.envs_in_sync.add(env_id)
Db( loop: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
504 def __init__(self, loop: CoroSchedulerType): 505 super(Db, self).__init__(loop) 506 self.default_env_name: str = '__default__.dbenv' 507 self.default_envs_dir: str = 'db_envs' 508 # self.drop_db_requests: Dict[CoroID, Tuple[Hashable, bool]] = dict() 509 # self.drop_db_requests: Dict[EnvId, Dict[CoroID, Tuple[DbId, bool]]] = dict() 510 self.drop_db_requests: Dict[EnvId, Dict[DbId, Tuple[bool, Set[CoroID]]]] = dict() 511 # self.read_queue: List[Tuple[CoroID, Tuple[RawKeyType, DbId]]] = list() 512 self.read_queue: Dict[EnvId, Dict[DbId, Dict[RawKeyType, Set[CoroID]]]] = dict() 513 self.massive_read_queue: Dict[EnvId, Dict[CoroID, Dict[DbId, List[RawKeyType]]]] = dict() 514 # self.massive_read_queue: List[Tuple[CoroID, Set[Tuple[KeyType, DbId]]]] = list() 515 self.put_queue: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]] = dict() 516 self.data_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]] = dict() 517 # self.data_cache: Dict[EnvId, Dict[Tuple[RawKeyType, DbId], RawValueType]] = dict() 518 self.max_data_cache_size: Union[None, int] = 10000 519 self.deletion_cache: Dict[EnvId, Dict[DbId, Set[RawKeyType]]] = dict() 520 self.kv_deletion_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[RawValueType]]]] = dict() 521 # self.kv_deletion_cache: Dict[Tuple[Hashable, Hashable], Any] = dict() 522 self.get_first_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict() 523 self.get_last_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict() 524 self.get_n_items_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, int, bool]]] = dict() 525 self.get_items_range_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, RawKeyType, int, bool]]] = dict() 526 # self.get_all_items_queue: List[Tuple[CoroID, DbId, EnvId]] = list() 527 self.get_all_items_queue: Dict[EnvId, List[Tuple[CoroID, DbId]]] = dict() 528 self.open_db_environment_requests: Dict[CoroID, Tuple[EnvId, Union[None, str], bool, Tuple, Dict]] = dict() 529 self.root_path_to_db_environments_rel: RelativePath = None 530 self.app_name_waiter: CoroWrapperBase = None 531 self.default_db_environment: lmdb.Environment = None 532 self.db_environments: Dict[EnvId, EnvInfo] = dict() 533 # self.databases: Dict[Hashable, Any] = dict() 534 # self.db_names: Dict[DbId, DbName] = dict() 535 self.async_loop = None 536 self.sync_time_interval = 1.0 537 self.last_sync_time = perf_counter() 538 self.envs_need_to_be_sync: Set[DbId] = set() 539 self.envs_in_sync: Set[DbId] = set() 540 self.force_sync: Set[EnvId] = set() 541 self.sync_an_each_write: bool = False 542 self.write_locked: Set[EnvId] = set() 543 self.writes_num: int = 0 544 self.reads_num: int = 0 545 self.deletes_num: int = 0 546 self.db_drops_num: int = 0 547 self.write_locked_coro_id: Set[CoroID] = set() 548 self.wake_up_handle_registered: bool = False 549 # self.serializer = best_serializer_for_standard_data(( 550 # DataFormats.binary, 551 # DataFormats.messagepack, 552 # Tags.can_use_bytes, 553 # Tags.decode_str_as_str, 554 # Tags.decode_list_as_list, 555 # Tags.decode_bytes_as_bytes, 556 # Tags.superficial, 557 # Tags.current_platform, 558 # Tags.multi_platform, 559 # ), 560 # TestDataType.small, 561 # 0.1) 562 self.serializer = Serializer(Serializers.msgspec_messagepack) 563 self._request_workers = { 564 0: self._on_set_root_path_to_db_environments, 565 1: self._on_open_databases, 566 2: self._on_drop_db, 567 3: self._on_sync, 568 4: self._on_get, 569 5: self._on_get_items, 570 6: self._on_get_all_items, 571 7: self._on_put, 572 8: self._on_put_items, 573 9: self._on_delete, 574 10: self._on_delete_kv, 575 11: self._on_delete_items, 576 12: self._on_delete_kv_items, 577 13: self._on_open_db_environment, 578 14: self._on_get_first, 579 15: self._on_get_last, 580 16: self._on_get_n_items, 581 17: self._on_get_items_range, 582 18: self._on_close_db_environment, 583 }
put_queue: Dict[Hashable, Dict[Hashable, Dict[bytes, List[Union[bytes, cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.RawValueInfo]]]]]
data_cache: Dict[Hashable, Dict[Hashable, Dict[bytes, List[Union[bytes, cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.RawValueInfo]]]]]
root_path_to_db_environments_rel: cengal.file_system.path_manager.versions.v_0.path_manager.RelativePath
app_name_waiter: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroWrapperBase
db_environments: Dict[Hashable, EnvInfo]
def
get_entity_stats( self, stats_level: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin.StatsLevel = <StatsLevel.debug: 1>) -> Tuple[str, Dict[str, Any]]:
586 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 587 return type(self).__name__, { 588 'db_env_ids': list(self.db_environments.keys()), 589 'writes num': self.writes_num, 590 'reads num': self.reads_num, 591 'deletes num': self.deletes_num, 592 'db drops num': self.db_drops_num, 593 }
def
norm_key( self, key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]) -> Union[bytes, str, Tuple[Union[bytes, str]], tuple[Union[Tuple[Union[bytes, str]], Tuple[ForwardRef('NormalizedCompoundKeyType')]]]]:
def
raw_key( self, env_or_id: typing.Union[Environment, typing.Hashable], key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]) -> bytes:
598 def raw_key(self, env_or_id: Union[lmdb.Environment, EnvId], key: InputKeyType) -> RawKeyType: 599 raw_key: bytes = self.serializer.dumps(normalize_compound_key(key)) 600 if isinstance(env_or_id, lmdb.Environment): 601 env = env_or_id 602 else: 603 env = self.db_environments[env_or_id].env 604 605 if len(raw_key) > env.max_key_size(): 606 raise RawKeyIsTooLargeError(f'Raw form ({raw_key=}) of the key ({key=}) is too large: {len(raw_key)} > {env.max_key_size()}')
def
destroy(self):
608 def destroy(self): 609 # TODO: we need to use some loop destroy service in order to put our coro which will write all pending queues, 610 # sync envirounments and close them. Also we need to prevent new requests from being processed. 611 db_environments_values = self.db_environments.values() 612 self.db_environments = type(self.db_environments)() 613 self.default_db_environment = None 614 for env_info in db_environments_values: 615 env_info.close()
def
single_task_registration_or_immediate_processing( self, *args, **kwargs) -> Tuple[bool, Any, Union[BaseException, NoneType]]:
def
full_processing_iteration(self):
651 def full_processing_iteration(self): 652 # TODO: combine all queues into one single transaction by env_id. Or at most two transactions: read and write. 653 # This will improve performance and will give ability to move transations to other threads or even processess if needed. 654 655 # TODO: since DB can not handle big number of transactions per secons, it is better to combine all requests and process 656 # them at most as once a milisecond (it is frequently engough). 657 658 if not self._ensure_default_db_environment(): 659 return 660 661 if self.force_sync: 662 self.envs_need_to_be_sync |= self.force_sync 663 self.force_sync = set() 664 665 put_queue_buff = self.put_queue 666 self.put_queue = type(put_queue_buff)() 667 668 data_cache_buff = self.data_cache # will be cleared at the end of the iteration if necessary 669 670 read_queue_buff = self.read_queue 671 self.read_queue = type(read_queue_buff)() 672 673 massive_read_queue_buff = self.massive_read_queue 674 self.massive_read_queue = type(massive_read_queue_buff)() 675 676 deletion_cache_buff = self.deletion_cache 677 self.deletion_cache = type(deletion_cache_buff)() 678 679 kv_deletion_cache_buff = self.kv_deletion_cache 680 self.kv_deletion_cache = type(kv_deletion_cache_buff)() 681 682 get_all_items_queue_buff = self.get_all_items_queue 683 self.get_all_items_queue = type(get_all_items_queue_buff)() 684 685 get_first_queue_buff = self.get_first_queue 686 self.get_first_queue = type(get_first_queue_buff)() 687 688 get_last_queue_buff = self.get_last_queue 689 self.get_last_queue = type(get_last_queue_buff)() 690 691 get_n_items_queue_buff = self.get_n_items_queue 692 self.get_n_items_queue = type(get_n_items_queue_buff)() 693 694 get_items_range_queue_buff = self.get_items_range_queue 695 self.get_items_range_queue = type(get_items_range_queue_buff)() 696 697 open_db_environment_requests_buff: Dict[CoroID, Tuple[EnvId, Union[None, str], bool, Tuple, Dict]] = self.open_db_environment_requests 698 self.open_db_environment_requests = type(open_db_environment_requests_buff)() 699 700 # open_db_environment 701 for coro_id, request_info in open_db_environment_requests_buff.items(): 702 env_id, env_path, can_be_written_externally, args, kwargs = request_info 703 if env_id in self.db_environments: 704 self.register_response(coro_id, self.db_environments[env_id]) 705 else: 706 exception = None 707 try: 708 result = self._init_db_env(env_id, env_path, can_be_written_externally, *args, **kwargs) 709 except: 710 exception = get_exception() 711 712 self.register_response(coro_id, result, exception) 713 714 # put 715 def put_handler(env_info: EnvInfo, put_info: Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]): 716 try: 717 with env_info.env.begin(write=True) as txn: 718 for db_id, db_put_info in put_info.items(): 719 if db_id in env_info.databases: 720 for raw_key, values in db_put_info.items(): 721 for value in values: 722 if isinstance(value, RawValueInfo): 723 value, dupdata, overwrite, append = value 724 else: 725 dupdata, overwrite, append = True, True, False 726 727 txn.put(raw_key, value, db=env_info.databases[db_id], dupdata=dupdata, overwrite=overwrite, append=append) 728 729 self.writes_num += len(db_put_info) 730 except lmdb.MapFullError: 731 raise DBError.from_exception(db_id) 732 733 for env_id, put_info in put_queue_buff.items(): 734 if env_id in self.db_environments: 735 self.envs_need_to_be_sync.add(env_id) 736 lmdb_reapplier(self.db_environments[env_id], put_handler, put_info) 737 738 # TODO: implement replace* methods processing 739 740 # delete 741 for env_id, deletion_cache_buff_db_info in deletion_cache_buff.items(): 742 if env_id in self.db_environments: 743 env_info = self.db_environments[env_id] 744 self.envs_need_to_be_sync.add(env_id) 745 with env_info.env.begin(write=True) as txn: 746 for db_id, del_keys in deletion_cache_buff_db_info.items(): 747 if db_id in env_info.databases: 748 for del_raw_key in del_keys: 749 txn.delete(del_raw_key, db=env_info.databases[db_id]) 750 self.deletes_num += 1 751 try: 752 data_cache_buff[env_id][db_id].pop(del_raw_key, None) 753 except KeyError: 754 pass 755 756 # delete_kv 757 for env_id, kv_deletion_cache_buff_db_info in kv_deletion_cache_buff.items(): 758 if env_id in self.db_environments: 759 env_info = self.db_environments[env_id] 760 self.envs_need_to_be_sync.add(env_id) 761 with env_info.env.begin(write=True) as txn: 762 for db_id, del_keys in kv_deletion_cache_buff_db_info.items(): 763 if db_id in env_info.databases: 764 for del_raw_key, del_raw_values in del_keys.items(): 765 for del_raw_value in del_raw_values: 766 txn.delete(del_raw_key, del_raw_value, db=env_info.databases[db_id]) 767 self.deletes_num += 1 768 try: 769 raw_values: List[Union[RawValueType, RawValueInfo]] = data_cache_buff[env_id][db_id][del_raw_key] 770 new_raw_values: List[Union[RawValueType, RawValueInfo]] = list() 771 for raw_value in raw_values: 772 raw_value_original = raw_value 773 if isinstance(raw_value, RawValueInfo): 774 raw_value, _, _, _ = raw_value 775 776 if raw_value != del_raw_value: 777 new_raw_values.append(raw_value_original) 778 779 if new_raw_values: 780 data_cache_buff[env_id][db_id][del_raw_key] = new_raw_values 781 else: 782 data_cache_buff[env_id][db_id].pop(del_raw_key, None) 783 except KeyError: 784 pass 785 786 # drop 787 drop_db_requests_buff = self.drop_db_requests 788 self.drop_db_requests = type(drop_db_requests_buff)() 789 790 def drop_handler(env_info: EnvInfo, drop_info: Dict[DbId, Tuple[bool, Set[CoroID]]]): 791 for db_id, db_drop_info in drop_info.items(): 792 delete_db, coro_id = db_drop_info 793 if db_id in env_info.databases: 794 try: 795 with env_info.env.begin(write=True) as txn: 796 txn.drop(db=env_info.databases[db_id], delete=delete_db) 797 if delete_db: 798 del env_info.databases[db_id] 799 del env_info.db_names[db_id] 800 801 self.db_drops_num += 1 802 except lmdb.MapFullError: 803 raise DBError.from_exception(db_id) 804 805 self.register_response(coro_id, None, UnknownEnvError(env_id)) 806 else: 807 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 808 809 for env_id, drop_info in drop_db_requests_buff.items(): 810 if env_id in self.db_environments: 811 self.envs_need_to_be_sync.add(env_id) 812 lmdb_reapplier(self.db_environments[env_id], drop_handler, drop_info) 813 else: 814 for db_id, db_drop_info in drop_info.items(): 815 delete_db, coro_id = db_drop_info 816 self.register_response(coro_id, None, UnknownEnvError(env_id)) 817 818 # get 819 def get_item(txn, key_info: Tuple[RawKeyType, DbId, EnvId], data_cache_buff: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]]) -> Tuple[ValueType, Optional[Exception]]: 820 key, db_id, env_id = key_info 821 need_to_get_from_db = True 822 try: 823 values = data_cache_buff[env_id][db_id][key] 824 if values: 825 value = values[0] 826 if isinstance(value, RawValueInfo): 827 value, _, _, _ = value 828 829 need_to_get_from_db = False 830 except KeyError: 831 pass 832 833 if need_to_get_from_db: 834 value = txn.get(key, db=self.db_environments[env_id].databases[db_id]) 835 self.reads_num += 1 836 837 exception = None 838 try: 839 if value is None: 840 exception = DbKeyError(key_info) 841 else: 842 value = self.serializer.loads(value) 843 except: 844 exception = get_exception() 845 846 return value, exception 847 848 # _on_get 849 for env_id, read_queue_buff_db_info in read_queue_buff.items(): 850 if env_id in self.db_environments: 851 env_info = self.db_environments[env_id] 852 for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items(): 853 if db_id in env_info.databases: 854 with env_info.env.begin() as txn: 855 for raw_key, coro_ids in read_queue_buff_db_key_info.items(): 856 value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff) 857 for coro_id in coro_ids: 858 self.register_response(coro_id, value, exception) 859 else: 860 for raw_key, coro_ids in read_queue_buff_db_key_info.items(): 861 for coro_id in coro_ids: 862 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 863 else: 864 for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items(): 865 for raw_key, coro_ids in read_queue_buff_db_key_info.items(): 866 for coro_id in coro_ids: 867 self.register_response(coro_id, None, UnknownEnvError(env_id)) 868 869 # _on_get_items 870 results: Dict[CoroID, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]]] = dict() 871 for env_id, massive_read_queue_buff_coro_info in massive_read_queue_buff.items(): 872 if env_id in self.db_environments: 873 env_info = self.db_environments[env_id] 874 for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items(): 875 if coro_id not in results: 876 results[coro_id] = dict() 877 878 coro_results = results[coro_id] 879 for db_id, raw_keys in read_queue_buff_db_info.items(): 880 if db_id not in coro_results: 881 coro_results[db_id] = OrderedDict() 882 883 coro_db_results = coro_results[db_id] 884 if db_id in env_info.databases: 885 with env_info.env.begin() as txn: 886 for raw_key in raw_keys: 887 value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff) 888 coro_db_results[normalize_compound_key(raw_key)] = (value, exception) 889 else: 890 for coro_id in coro_ids: 891 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 892 else: 893 for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items(): 894 for db_id, raw_keys in read_queue_buff_db_info.items(): 895 for coro_id in coro_ids: 896 self.register_response(coro_id, None, UnknownEnvError(env_id)) 897 898 for coro_id, coro_results in results.items(): 899 if coro_results: 900 db_id = tuple(coro_results.keys())[0] 901 self.register_response(coro_id, coro_results[db_id], None) 902 else: 903 self.register_response(coro_id, OrderedDict(), None) 904 905 # get all items 906 for env_id, requests_info in get_all_items_queue_buff.items(): 907 for request_info in requests_info: 908 coro_id, db_id = request_info 909 if env_id in self.db_environments: 910 env_info = self.db_environments[env_id] 911 env = env_info.env 912 if db_id in env_info.databases: 913 db = env_info.databases[db_id] 914 with env.begin(db=db) as txn: 915 result = list() 916 exception = None 917 try: 918 result = [(normalize_compound_key(self.serializer.loads(k)), self.serializer.loads(v)) for k, v in txn.cursor() if not k.startswith(EnvInfo.db_name_prefix_bytes)] 919 self.reads_num += len(result) 920 except: 921 exception = get_exception() 922 923 self.register_response(coro_id, result, exception) 924 else: 925 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 926 else: 927 self.register_response(coro_id, None, UnknownEnvError(env_id)) 928 929 # get_first 930 for env_id, get_first_queue_buff_db_info in get_first_queue_buff.items(): 931 if env_id in self.db_environments: 932 env_info = self.db_environments[env_id] 933 for db_id, coro_ids in get_first_queue_buff_db_info.items(): 934 if db_id in env_info.databases: 935 db = env_info.databases[db_id] 936 with env_info.env.begin(db=db) as txn: 937 result = None 938 exception = None 939 try: 940 cursor: lmdb.Cursor = txn.cursor() 941 if cursor.first(): 942 key = cursor.key() 943 key_found: bool = True 944 while key.startswith(EnvInfo.db_name_prefix_bytes): 945 key_found = cursor.next_nodup() 946 if not key_found: 947 break 948 949 key = cursor.key() 950 951 if key_found: 952 value = cursor.value() 953 result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value())) 954 self.reads_num += 1 955 else: 956 exception = KeyError() 957 else: 958 exception = KeyError() 959 except: 960 exception = get_exception() 961 962 for coro_id in coro_ids: 963 self.register_response(coro_id, result, exception) 964 else: 965 for coro_id in coro_ids: 966 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 967 else: 968 for db_id, coro_ids in get_first_queue_buff_db_info.items(): 969 for coro_id in coro_ids: 970 self.register_response(coro_id, None, UnknownEnvError(env_id)) 971 972 # get_last 973 for env_id, get_last_queue_buff_db_info in get_last_queue_buff.items(): 974 if env_id in self.db_environments: 975 env_info = self.db_environments[env_id] 976 for db_id, coro_ids in get_last_queue_buff_db_info.items(): 977 if db_id in env_info.databases: 978 db = env_info.databases[db_id] 979 with env_info.env.begin(db=db) as txn: 980 result = None 981 exception = None 982 try: 983 cursor: lmdb.Cursor = txn.cursor() 984 if cursor.last(): 985 key: bytes = cursor.key() 986 key_found: bool = True 987 key = cursor.key() 988 while key.startswith(EnvInfo.db_name_prefix_bytes): 989 key_found = cursor.prev_nodup() 990 if not key_found: 991 break 992 993 key = cursor.key() 994 995 if key_found: 996 value: bytes = cursor.value() 997 result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value())) 998 self.reads_num += 1 999 else: 1000 exception = KeyError() 1001 else: 1002 exception = KeyError() 1003 except: 1004 exception = get_exception() 1005 1006 for coro_id in coro_ids: 1007 self.register_response(coro_id, result, exception) 1008 else: 1009 for coro_id in coro_ids: 1010 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 1011 else: 1012 for db_id, coro_ids in get_last_queue_buff_db_info.items(): 1013 for coro_id in coro_ids: 1014 self.register_response(coro_id, None, UnknownEnvError(env_id)) 1015 1016 # get_n_items 1017 for env_id, get_n_items_queue_buff_coro_info in get_n_items_queue_buff.items(): 1018 if env_id in self.db_environments: 1019 env_info = self.db_environments[env_id] 1020 for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items(): 1021 coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list() 1022 db_id, first_desired_raw_key, num_items, reverse = read_queue_buff_db_info 1023 if db_id in env_info.databases: 1024 db = env_info.databases[db_id] 1025 with env_info.env.begin(db=db) as txn: 1026 exception = None 1027 try: 1028 cursor: lmdb.Cursor = txn.cursor() 1029 if cursor.set_range(first_desired_raw_key): 1030 if reverse: 1031 cursor_iterator = cursor.iterprev(keys=True, values=True) 1032 else: 1033 cursor_iterator = cursor.iternext(keys=True, values=True) 1034 1035 for raw_key, raw_value in cursor_iterator: 1036 if raw_key.startswith(EnvInfo.db_name_prefix_bytes): 1037 continue 1038 1039 if (num_items is not None) and (num_items <= 0): 1040 break 1041 1042 coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value))) 1043 self.reads_num += 1 1044 if num_items is not None: 1045 num_items -= 1 1046 else: 1047 exception = KeyError() 1048 except: 1049 exception = get_exception() 1050 1051 self.register_response(coro_id, coro_results, exception) 1052 else: 1053 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 1054 else: 1055 for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items(): 1056 for coro_id in coro_ids: 1057 self.register_response(coro_id, None, UnknownEnvError(env_id)) 1058 1059 # get_items_range 1060 for env_id, get_items_range_queue_buff_coro_info in get_items_range_queue_buff.items(): 1061 if env_id in self.db_environments: 1062 env_info = self.db_environments[env_id] 1063 for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items(): 1064 coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list() 1065 db_id, first_desired_raw_key, last_desired_raw_key, num_items, reverse = read_queue_buff_db_info 1066 if db_id in env_info.databases: 1067 db = env_info.databases[db_id] 1068 with env_info.env.begin(db=db) as txn: 1069 exception = None 1070 try: 1071 cursor: lmdb.Cursor = txn.cursor() 1072 if cursor.set_range(first_desired_raw_key): 1073 if reverse: 1074 cursor_iterator = cursor.iterprev(keys=True, values=True) 1075 else: 1076 cursor_iterator = cursor.iternext(keys=True, values=True) 1077 1078 for raw_key, raw_value in cursor_iterator: 1079 if raw_key.startswith(EnvInfo.db_name_prefix_bytes): 1080 continue 1081 1082 if reverse: 1083 if raw_key < last_desired_raw_key: 1084 break 1085 else: 1086 if raw_key > last_desired_raw_key: 1087 break 1088 1089 if (num_items is not None) and (num_items <= 0): 1090 break 1091 1092 coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value))) 1093 self.reads_num += 1 1094 if num_items is not None: 1095 num_items -= 1 1096 else: 1097 exception = KeyError() 1098 except: 1099 exception = get_exception() 1100 1101 self.register_response(coro_id, coro_results, exception) 1102 else: 1103 self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id)) 1104 else: 1105 for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items(): 1106 for coro_id in coro_ids: 1107 self.register_response(coro_id, None, UnknownEnvError(env_id)) 1108 1109 need_to_sync = self.sync_an_each_write 1110 1111 # periodic sync 1112 if (perf_counter() - self.last_sync_time) >= self.sync_time_interval: 1113 for env_id, env_info in self.db_environments.items(): 1114 self.envs_need_to_be_sync.add(env_id) 1115 need_to_sync = True 1116 1117 # sync 1118 if need_to_sync and self.envs_need_to_be_sync: 1119 envs_need_to_be_sync_bak = self.envs_need_to_be_sync - self.envs_in_sync 1120 self.envs_need_to_be_sync = set(self.envs_in_sync) 1121 for env_id in envs_need_to_be_sync_bak: 1122 self.sync_in_thread_pool(env_id) 1123 1124 self.last_sync_time = perf_counter() 1125 1126 # invalidate data_cache for envs that can be written externally 1127 for env_id, env_info in self.db_environments.items(): 1128 if env_info.init_info.can_be_written_externally: 1129 self.data_cache.pop(env_id, None) 1130 1131 # clear too big caches 1132 for env_id, env_info in self.db_environments.items(): 1133 if env_id in self.data_cache: 1134 if len(self.data_cache[env_id]) > self.max_data_cache_size: 1135 del self.data_cache[env_id] 1136 1137 self.make_dead()
def
in_work(self) -> bool:
1139 def in_work(self) -> bool: 1140 result: bool = bool(self.default_db_environment is None) \ 1141 or bool(self.open_db_environment_requests) \ 1142 or bool(self.get_first_queue) \ 1143 or bool(self.get_last_queue) \ 1144 or bool(self.get_n_items_queue) \ 1145 or bool(self.get_items_range_queue) \ 1146 or bool(self.read_queue) \ 1147 or bool(self.massive_read_queue) \ 1148 or bool(self.get_all_items_queue)\ 1149 or bool(self.force_sync) \ 1150 or bool(self.deletion_cache) \ 1151 or bool(self.kv_deletion_cache) \ 1152 or bool(self.drop_db_requests) \ 1153 or bool(self.put_queue) \ 1154 or bool(self.envs_need_to_be_sync) \ 1155 or ((perf_counter() - self.last_sync_time) >= self.sync_time_interval) 1156 return self.thrifty_in_work(result)
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
def
time_left_before_next_event(self) -> Tuple[bool, Union[int, float, NoneType]]:
1158 def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]: 1159 time_since_last_sync_time: float = perf_counter() - self.last_sync_time 1160 if self.sync_time_interval > time_since_last_sync_time: 1161 return True, self.sync_time_interval - time_since_last_sync_time 1162 else: 1163 return True, 0
def
sync_in_thread_pool(self, env_id: typing.Hashable = None):
1657 def sync_in_thread_pool(self, env_id: EnvId = None): 1658 async def sync_db_coro(i: Interface, self: 'Db', env_id: EnvId, asyncio_loop, need_to_ensure_asyncio_loop: bool): 1659 if need_to_ensure_asyncio_loop: 1660 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None,CoroPriority.low, True)) 1661 else: 1662 if asyncio_loop is None: 1663 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().get()) 1664 1665 async def sync_db(self: 'Db', asyncio_loop, env: lmdb.Environment): 1666 def sync_worker(): 1667 env.sync(True) 1668 self.write_locked.discard(env_id) 1669 1670 await task_in_thread_pool(asyncio_loop, sync_worker) 1671 1672 env: lmdb.Environment = self.db_environments[env_id].env 1673 await i(AsyncioLoop, AsyncioLoopRequest().wait(sync_db(self, asyncio_loop, env))) 1674 self.write_locked_coro_id.discard(i.coro_id) 1675 self.envs_in_sync.discard(env_id) 1676 def make_service_live_for_a_next_sync(self: 'Db'): 1677 self.make_live() 1678 self.wake_up_handle_registered = False 1679 1680 if not self.wake_up_handle_registered: 1681 self.wake_up_handle_registered = True 1682 await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self) 1683 1684 asyncio_loop = None 1685 need_to_ensure_asyncio_loop = False 1686 try: 1687 asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get() 1688 except AsyncioLoopWasNotSetError: 1689 need_to_ensure_asyncio_loop = True 1690 1691 coro: CoroWrapperBase = put_root_from_other_service(self, sync_db_coro, self, env_id, asyncio_loop, need_to_ensure_asyncio_loop) 1692 coro.is_background_coro = True 1693 self.write_locked.add(env_id) 1694 self.write_locked_coro_id.add(coro.coro_id) 1695 self.envs_in_sync.add(env_id)
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
- StatsLevel
class
DbRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
274class DbRequest(ServiceRequest): 275 def __init__(self, env_id: EnvId = None, db_id: DbId = None, needs_sync: bool = False, can_wait: bool = False): 276 super().__init__() 277 self.env_id: EnvId = env_id 278 self.db_id: DbId = db_id 279 self.needs_sync: bool = needs_sync 280 self.provide_to_request_handler = True 281 self.can_wait: bool = can_wait # TODO: implement. If True then request can wait for a next iteration in attempt to create a bunch of requests. If False then request will be processed immediately. 282 283 def _copy(self) -> 'DbRequest': 284 return DbRequest(self.env_id, self.db_id, self.needs_sync) 285 286 def set_root_path_to_db_environments(self, root_path_to_db_environments: str) -> bool: 287 return self._save_to_copy(0, root_path_to_db_environments) 288 289 def open_databases(self, db_ids: Set[DbId], *args, **kwargs) -> None: 290 return self._save_to_copy(1, db_ids, *args, **kwargs) 291 292 def drop_db(self, db_id: DbId, delete: bool = False) -> None: 293 return self._save_to_copy(2, db_id, delete) 294 295 def sync(self) -> None: 296 return self._save_to_copy(3) 297 298 def wait_sync(self) -> None: # TODO: implement 299 return self._save_to_copy(22) 300 301 def set_sync_timeout(self, timeout: RationalNumber) -> None: # TODO: implement 302 return self._save_to_copy(23, timeout) 303 304 def get(self, key: InputKeyType) -> ValueType: 305 return self._save_to_copy(4, key) 306 307 def get_first(self) -> Tuple[NormalizedKeyType, ValueType]: 308 # Returns first item in DB 309 return self._save_to_copy(14) 310 311 def get_last(self) -> Tuple[NormalizedKeyType, ValueType]: 312 # Returns last item in DB 313 return self._save_to_copy(15) 314 315 def get_items(self, db_keys: Sequence[InputKeyType]) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]: 316 return self._save_to_copy(5, db_keys) 317 318 def get_n_items(self, desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]: 319 return self._save_to_copy(16, desired_key, num, reverse=False) 320 321 def get_reverse_n_items(self, desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]: 322 return self._save_to_copy(16, desired_key, num, reverse=True) 323 324 def get_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]: 325 return self._save_to_copy(17, first_desired_key, last_desired_key, num, reverse=False) 326 327 def get_reverse_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]: 328 return self._save_to_copy(17, first_desired_key, last_desired_key, num, reverse=True) 329 330 def get_all_items(self) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]: 331 # Returns all items from DB 332 return self._save_to_copy(6) 333 334 def pop(self, key: InputKeyType) -> ValueType: # TODO: implement 335 return self._save_to_copy(24, key) 336 337 def pop_items(self, db_keys: Sequence[InputKeyType]) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]: # TODO: implement 338 return self._save_to_copy(25, db_keys) 339 340 def put(self, key: InputKeyType, value: Optional[Union[ValueType, ValueInfo]] = None) -> RawValueType: 341 return self._save_to_copy(7, key, value) 342 343 def put_items(self, db_items: Dict[InputKeyType, Union[ValueType, ValueInfo]]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]: 344 return self._save_to_copy(8, db_items) 345 346 def increment(self, key: InputKeyType, value: RationalNumber = 1) -> RationalNumber: # TODO: implement 347 return self._save_to_copy(19, key, value) 348 349 inc = increment 350 351 def increment_items(self, db_items: Dict[InputKeyType, Union[RationalNumber, ValueInfo]]) -> OrderedDictType[NormalizedKeyType, Tuple[RationalNumber, Optional[Exception]]]: # TODO: implement 352 return self._save_to_copy(8, db_items) 353 354 inc_items = increment_items 355 356 def replace(self, key: InputKeyType, value: Optional[Union[ValueType, ValueInfo]] = None) -> Tuple[RawValueType, ValueType]: # TODO: implement 357 return self._save_to_copy(20, key, value) 358 359 def replace_items(self, db_items: Dict[InputKeyType, Union[ValueType, ValueInfo]]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, ValueType, Optional[Exception]]]]: # TODO: implement 360 return self._save_to_copy(21, db_items) 361 362 def delete(self, key: InputKeyType) -> RawValueType: # TODO: finish an implementation 363 return self._save_to_copy(9, key) 364 365 def delete_kv(self, key: InputKeyType, value: ValueType) -> RawValueType: # TODO: finish an implementation 366 return self._save_to_copy(10, key, value) 367 368 def delete_items(self, db_items: Set[InputKeyType]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]: # TODO: finish an implementation 369 return self._save_to_copy(11, db_items) 370 371 def delete_kv_items(self, db_items: Dict[InputKeyType, Tuple[ValueType]]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]: # TODO: finish an implementation 372 return self._save_to_copy(12, db_items) 373 374 def execute_in_transaction(self, env_id: EnvId, callable_or_coro: Union[Callable, AnyWorker]) -> None: # TODO: implement 375 # Will execute given callable or coroutine in transaction (after all other queues will be processed) 376 # It will run in current thread. So, it will block current thread until it will be finished if it is not a coroutine. 377 # On the other hand it will lock environment and all databases in it until coroutine will be finished. 378 return self._save_to_copy(29, env_id, callable_or_coro) 379 380 def open_db_environment(self, env_id: EnvId, env_path: Union[None, str], can_be_written_externally: bool, *args, **kwargs) -> EnvId: 381 return self._save_to_copy(13, env_id, env_path, can_be_written_externally, *args, **kwargs) 382 383 def close_db_environment(self, env_id: EnvId) -> None: 384 return self._save_to_copy(18, env_id) 385 386 def lock_databases(self, db_names: Optional[Set[DbId]] = None) -> None: # TODO: implement 387 # Lock all databases if db_names is None. Databases will be released automatically wnen coroutine execution will be finished 388 return self._save_to_copy(26, db_names) 389 390 def try_lock_databases(self, db_names: Optional[Set[DbId]] = None) -> bool: # TODO: implement 391 # Tries to lock all databases if db_names is None. Returns True if try was successfull. False otherwise. Databases will be released automatically wnen coroutine execution will be finished 392 return self._save_to_copy(27, db_names) 393 394 def unlock_databases(self, db_names: Optional[Set[DbId]] = None) -> None: # TODO: implement 395 # Unlock all databases if db_names is None 396 return self._save_to_copy(28, db_names)
DbRequest( env_id: typing.Hashable = None, db_id: typing.Hashable = None, needs_sync: bool = False, can_wait: bool = False)
275 def __init__(self, env_id: EnvId = None, db_id: DbId = None, needs_sync: bool = False, can_wait: bool = False): 276 super().__init__() 277 self.env_id: EnvId = env_id 278 self.db_id: DbId = db_id 279 self.needs_sync: bool = needs_sync 280 self.provide_to_request_handler = True 281 self.can_wait: bool = can_wait # TODO: implement. If True then request can wait for a next iteration in attempt to create a bunch of requests. If False then request will be processed immediately.
def
get( self, key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]) -> Any:
def
get_first( self) -> tuple[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Any]:
def
get_last( self) -> tuple[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Any]:
def
get_items( self, db_keys: collections.abc.Sequence[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]]) -> collections.OrderedDict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Any, typing.Union[Exception, NoneType]]]:
def
get_n_items( self, desired_key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], num: typing.Union[int, NoneType] = None) -> collections.OrderedDict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Any, typing.Union[Exception, NoneType]]]:
def
get_reverse_n_items( self, desired_key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], num: typing.Union[int, NoneType] = None) -> collections.OrderedDict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Any, typing.Union[Exception, NoneType]]]:
def
get_items_range( self, first_desired_key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], last_desired_key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], num: typing.Union[int, NoneType] = None) -> collections.OrderedDict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Any, typing.Union[Exception, NoneType]]]:
def
get_reverse_items_range( self, first_desired_key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], last_desired_key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], num: typing.Union[int, NoneType] = None) -> collections.OrderedDict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Any, typing.Union[Exception, NoneType]]]:
def
get_all_items( self) -> collections.OrderedDict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Any, typing.Union[Exception, NoneType]]]:
def
pop( self, key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]) -> Any:
def
pop_items( self, db_keys: collections.abc.Sequence[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]]) -> collections.OrderedDict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Any, typing.Union[Exception, NoneType]]]:
def
put( self, key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], value: typing.Union[typing.Any, cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.ValueInfo, NoneType] = None) -> bytes:
def
put_items( self, db_items: dict[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Union[typing.Any, cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.ValueInfo]]) -> Dict[Hashable, Dict[bytes, Tuple[bytes, Union[Exception, NoneType]]]]:
def
increment( self, key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], value: typing.Union[int, float] = 1) -> Union[int, float]:
def
inc( self, key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], value: typing.Union[int, float] = 1) -> Union[int, float]:
def
increment_items( self, db_items: dict[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Union[int, float, cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.ValueInfo]]) -> collections.OrderedDict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Union[int, float], typing.Union[Exception, NoneType]]]:
def
inc_items( self, db_items: dict[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Union[int, float, cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.ValueInfo]]) -> collections.OrderedDict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Union[int, float], typing.Union[Exception, NoneType]]]:
def
replace( self, key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], value: typing.Union[typing.Any, cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.ValueInfo, NoneType] = None) -> Tuple[bytes, Any]:
def
replace_items( self, db_items: dict[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Union[typing.Any, cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.ValueInfo]]) -> Dict[Hashable, Dict[bytes, Tuple[bytes, Any, Union[Exception, NoneType]]]]:
def
delete( self, key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]) -> bytes:
def
delete_kv( self, key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], value: typing.Any) -> bytes:
def
delete_items( self, db_items: set[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]]) -> Dict[Hashable, Dict[bytes, Tuple[bytes, Union[Exception, NoneType]]]]:
def
delete_kv_items( self, db_items: dict[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Tuple[typing.Any]]) -> Dict[Hashable, Dict[bytes, Tuple[bytes, Union[Exception, NoneType]]]]:
def
execute_in_transaction( self, env_id: typing.Hashable, callable_or_coro: typing.Union[typing.Callable, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]]) -> None:
374 def execute_in_transaction(self, env_id: EnvId, callable_or_coro: Union[Callable, AnyWorker]) -> None: # TODO: implement 375 # Will execute given callable or coroutine in transaction (after all other queues will be processed) 376 # It will run in current thread. So, it will block current thread until it will be finished if it is not a coroutine. 377 # On the other hand it will lock environment and all databases in it until coroutine will be finished. 378 return self._save_to_copy(29, env_id, callable_or_coro)
def
open_db_environment( self, env_id: typing.Hashable, env_path: typing.Union[NoneType, str], can_be_written_externally: bool, *args, **kwargs) -> Hashable:
def
lock_databases( self, db_names: typing.Union[typing.Set[typing.Hashable], NoneType] = None) -> None:
def
try_lock_databases( self, db_names: typing.Union[typing.Set[typing.Hashable], NoneType] = None) -> bool:
390 def try_lock_databases(self, db_names: Optional[Set[DbId]] = None) -> bool: # TODO: implement 391 # Tries to lock all databases if db_names is None. Returns True if try was successfull. False otherwise. Databases will be released automatically wnen coroutine execution will be finished 392 return self._save_to_copy(27, db_names)
def
unlock_databases( self, db_names: typing.Union[typing.Set[typing.Hashable], NoneType] = None) -> None:
default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] =
<class 'Db'>
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
- default__request__type__
- request_type
- args
- kwargs
- interface
- i
- async_interface
- ai
KeyType =
typing.Union[bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]
RawKeyType =
<class 'bytes'>
ValueType =
typing.Any
RawValueType =
<class 'bytes'>
DbId =
typing.Hashable
EnvId =
typing.Hashable
DbName =
<class 'bytes'>
class
DbKeyError(builtins.KeyError):
185class DbKeyError(KeyError): 186 def __init__(self, key_info: Tuple[KeyType, DbId], *args: object) -> None: 187 super().__init__(*args) 188 self.key_info: Tuple[KeyType, DbId] = key_info
Mapping key not found.
DbKeyError( key_info: tuple[typing.Union[bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Hashable], *args: object)
key_info: tuple[typing.Union[bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Hashable]
Inherited Members
- builtins.BaseException
- with_traceback
- args
class
EnvInfo:
137class EnvInfo: 138 db_name_prefix = '__db_name_key_16318cf6_3e16_4881_a22f_328aa41c0d4f__' 139 db_name_prefix_bytes = b'__db_name_key_16318cf6_3e16_4881_a22f_328aa41c0d4f__' 140 141 def __init__(self, init_info: EnvInitInfo, env: Optional[lmdb.Environment] = None): 142 self.init_info: EnvInitInfo = init_info 143 if not os.path.exists(init_info.env_path) or (os.path.exists(init_info.env_path) and not os.path.isdir(init_info.env_path)): 144 os.makedirs(init_info.env_path) 145 146 self.env: lmdb.Environment = lmdb.Environment(*init_info.args, **init_info.kwargs) if env is None else env 147 self.env_id: EnvId = init_info.env_id 148 self.databases: Dict[DbId, lmdb._Database] = dict() 149 self.db_names: Dict[DbId, DbName] = dict() 150 self.prepare_main_db() 151 152 @staticmethod 153 def gen_db_name_from_db_id(db_id: DbId) -> Union[None, bytes]: 154 if db_id is None: 155 return None 156 157 return f'{EnvInfo.db_name_prefix}{db_id}'.encode('utf-8') 158 159 def db_name_by_db_id(self, db_id: DbId) -> Union[None, bytes]: 160 try: 161 return self.db_names[db_id] 162 except KeyError: 163 raise UnknownEnvDBError(self.env_id, db_id) 164 165 def db_by_db_id(self, db_id: DbId) -> lmdb._Database: 166 try: 167 return self.databases[db_id] 168 except KeyError: 169 raise UnknownEnvDBError(self.env_id, db_id) 170 171 def open_db(self, db_id: DbId, *args, **kwargs) -> lmdb._Database: 172 db_name: Union[None, bytes] = self.gen_db_name_from_db_id(db_id) 173 new_db: lmdb._Database = self.env.open_db(db_name, *args, **kwargs) 174 self.databases[db_id] = new_db 175 self.db_names[db_id] = db_name 176 return new_db 177 178 def prepare_main_db(self) -> lmdb._Database: 179 self.open_db(None) 180 181 def close(self): 182 self.env.close()
EnvInfo( init_info: cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.EnvInitInfo, env: typing.Union[Environment, NoneType] = None)
141 def __init__(self, init_info: EnvInitInfo, env: Optional[lmdb.Environment] = None): 142 self.init_info: EnvInitInfo = init_info 143 if not os.path.exists(init_info.env_path) or (os.path.exists(init_info.env_path) and not os.path.isdir(init_info.env_path)): 144 os.makedirs(init_info.env_path) 145 146 self.env: lmdb.Environment = lmdb.Environment(*init_info.args, **init_info.kwargs) if env is None else env 147 self.env_id: EnvId = init_info.env_id 148 self.databases: Dict[DbId, lmdb._Database] = dict() 149 self.db_names: Dict[DbId, DbName] = dict() 150 self.prepare_main_db()
init_info: cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.EnvInitInfo