cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db

   1#!/usr/bin/env python
   2# coding=utf-8
   3
   4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
   5# 
   6# Licensed under the Apache License, Version 2.0 (the "License");
   7# you may not use this file except in compliance with the License.
   8# You may obtain a copy of the License at
   9# 
  10#     http://www.apache.org/licenses/LICENSE-2.0
  11# 
  12# Unless required by applicable law or agreed to in writing, software
  13# distributed under the License is distributed on an "AS IS" BASIS,
  14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15# See the License for the specific language governing permissions and
  16# limitations under the License.
  17
  18
  19__all__ = ['default_env_path_and_params', 'Db', 'DbRequest', 'KeyType', 'RawKeyType', 'ValueType', 'RawValueType', 
  20           'DbId', 'EnvId', 'DbName', 'DbKeyError', 'EnvInfo']
  21
  22from cengal.parallel_execution.coroutines.coro_scheduler import *
  23from cengal.parallel_execution.coroutines.coro_tools.await_coro import *
  24from cengal.parallel_execution.coroutines.coro_standard_services.asyncio_loop import *
  25from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import CoroPriority
  26from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import *
  27from cengal.parallel_execution.coroutines.coro_standard_services.timer_func_runner import *
  28from cengal.parallel_execution.coroutines.coro_standard_services.instance import *
  29from cengal.file_system.file_manager import path_relative_to_current_dir
  30from cengal.time_management.cpu_clock_cycles import perf_counter
  31from cengal.data_manipulation.serialization import *
  32from cengal.introspection.inspect import get_exception
  33from cengal.file_system.app_fs_structure.app_dir_path import AppDirPath, AppDirectoryType
  34from cengal.file_system.path_manager import RelativePath
  35from cengal.code_flow_control.args_manager import ArgsKwargs
  36from cengal.text_processing.text_processing import to_identifier
  37from cengal.math.numbers import RationalNumber
  38from typing import Hashable, Tuple, List, Any, Dict, Callable, Sequence, NamedTuple, OrderedDict as OrderedDictType
  39import sys
  40import os
  41import asyncio
  42try:
  43    import lmdb
  44except ImportError:
  45    from warnings import warn
  46    warn('''WARNING: `lmdb` library is not installed. Db service will not work.
  47         To install `lmdb` use: `pip install lmdb`''')
  48    raise
  49
  50from os.path import normpath
  51from uuid import uuid4
  52from collections import OrderedDict, namedtuple
  53from functools import update_wrapper
  54import inspect
  55
  56
  57"""
  58Module Docstring
  59Docstrings: http://www.python.org/dev/peps/pep-0257/
  60"""
  61
  62__author__ = "ButenkoMS <gtalk@butenkoms.space>"
  63__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
  64__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
  65__license__ = "Apache License, Version 2.0"
  66__version__ = "4.4.1"
  67__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
  68__email__ = "gtalk@butenkoms.space"
  69# __status__ = "Prototype"
  70__status__ = "Development"
  71# __status__ = "Production"
  72
  73
  74SingleKeyType = Union[bytes, str]
  75CompoundKeyType = Union[List[SingleKeyType], Set[SingleKeyType], Tuple[SingleKeyType], List['CompoundKeyType'], Set['CompoundKeyType'], Tuple['CompoundKeyType']]
  76KeyType = Union[SingleKeyType, CompoundKeyType]
  77NormalizedCompoundKeyType = Union[Tuple[SingleKeyType], Tuple['NormalizedCompoundKeyType']]
  78NormalizedKeyType = Union[SingleKeyType, NormalizedCompoundKeyType]
  79RawKeyType = bytes  # By default record keys are limited to 511 bytes in length, however this can be adjusted by rebuilding the library. The compile-time key length can be queried via Environment.max_key_size()
  80ValueType = Any
  81RawValueType = bytes
  82DbId = Hashable
  83DbName = bytes
  84EnvId = Hashable
  85
  86
  87class KeyInfo:
  88    def __init__(self, key: KeyType, db_id: DbId = None, env_id: EnvId = None):
  89        self.key: KeyType = key
  90        self.db_id: DbId = db_id
  91        self.env_id: EnvId = env_id
  92
  93
  94ValueInfo = NamedTuple('ValueInfo', [('value', ValueType), ('dupdata', bool), ('overwrite', bool), ('append', bool)])
  95VI = ValueInfo
  96RawValueInfo = NamedTuple('RawValueInfo', [('value', RawValueType), ('dupdata', bool), ('overwrite', bool), ('append', bool)])
  97RVI = RawValueInfo
  98
  99
 100default_env_path_and_params: ArgsKwargs = ArgsKwargs(
 101    env_path=None,
 102    can_be_written_externally=False,
 103    map_size=20 * 1024**2, 
 104    writemap=True, 
 105    max_dbs=10,
 106    map_async=True, 
 107    lock=False, 
 108    metasync=False, 
 109    sync=False, 
 110    meminit=False,
 111)
 112
 113
 114default_env_params: ArgsKwargs = ArgsKwargs(
 115    can_be_written_externally=False,
 116    map_size=20 * 1024**2, 
 117    writemap=True, 
 118    max_dbs=10,
 119    map_async=True, 
 120    lock=False, 
 121    metasync=False, 
 122    sync=False, 
 123    meminit=False,
 124)
 125
 126
 127class EnvInitInfo:
 128    def __init__(self, env_id: EnvId, env_path: str, can_be_written_externally: bool, *args, **kwargs):
 129        self.env_id: EnvId = env_id
 130        self.env_path: str = normpath(env_path)
 131        self.can_be_written_externally: bool = can_be_written_externally
 132        self.args: Tuple = (self.env_path,) + args
 133        self.kwargs: Dict = kwargs
 134
 135
 136class EnvInfo:
 137    db_name_prefix = '__db_name_key_16318cf6_3e16_4881_a22f_328aa41c0d4f__'
 138    db_name_prefix_bytes = b'__db_name_key_16318cf6_3e16_4881_a22f_328aa41c0d4f__'
 139
 140    def __init__(self, init_info: EnvInitInfo, env: Optional[lmdb.Environment] = None):
 141        self.init_info: EnvInitInfo = init_info
 142        if not os.path.exists(init_info.env_path) or (os.path.exists(init_info.env_path) and not os.path.isdir(init_info.env_path)):
 143            os.makedirs(init_info.env_path)
 144        
 145        self.env: lmdb.Environment = lmdb.Environment(*init_info.args, **init_info.kwargs) if env is None else env
 146        self.env_id: EnvId = init_info.env_id
 147        self.databases: Dict[DbId, lmdb._Database] = dict()
 148        self.db_names: Dict[DbId, DbName] = dict()
 149        self.prepare_main_db()
 150    
 151    @staticmethod
 152    def gen_db_name_from_db_id(db_id: DbId) -> Union[None, bytes]:
 153        if db_id is None:
 154            return None
 155        
 156        return f'{EnvInfo.db_name_prefix}{db_id}'.encode('utf-8')
 157    
 158    def db_name_by_db_id(self, db_id: DbId) -> Union[None, bytes]:
 159        try:
 160            return self.db_names[db_id]
 161        except KeyError:
 162            raise UnknownEnvDBError(self.env_id, db_id)
 163    
 164    def db_by_db_id(self, db_id: DbId) -> lmdb._Database:
 165        try:
 166            return self.databases[db_id]
 167        except KeyError:
 168            raise UnknownEnvDBError(self.env_id, db_id)
 169    
 170    def open_db(self, db_id: DbId, *args, **kwargs) -> lmdb._Database:
 171            db_name: Union[None, bytes] = self.gen_db_name_from_db_id(db_id)
 172            new_db: lmdb._Database = self.env.open_db(db_name, *args, **kwargs)
 173            self.databases[db_id] = new_db
 174            self.db_names[db_id] = db_name
 175            return new_db
 176    
 177    def prepare_main_db(self) -> lmdb._Database:
 178        self.open_db(None)
 179    
 180    def close(self):
 181        self.env.close()
 182
 183
 184class DbKeyError(KeyError):
 185    def __init__(self, key_info: Tuple[KeyType, DbId], *args: object) -> None:
 186        super().__init__(*args)
 187        self.key_info: Tuple[KeyType, DbId] = key_info
 188
 189
 190class DBError(Exception):
 191    def __init__(self, db_id: DbId, original_exception: Exception, *args):
 192        super().__init__(*args)
 193        self.db_id: DbId = db_id
 194        self.original_exception = original_exception
 195    
 196    @staticmethod
 197    def from_exception(db_id: DbId) -> 'DBError':
 198        return DBError(db_id, get_exception())
 199
 200
 201class UnknownEnvError(Exception):
 202    pass
 203
 204
 205class UnknownEnvDBError(Exception):
 206    pass
 207
 208
 209class WrongKeyTypeError(TypeError):
 210    pass
 211
 212
 213class RawKeyIsTooLargeError(ValueError):
 214    pass
 215
 216
 217class DefaultDbEnvironmentCanNotBeClosedManualyError(Exception):
 218    pass
 219
 220
 221class NormalizedCompoundKey:
 222    def __init__(self, normalized_key: NormalizedKeyType) -> None:
 223        self.normalized_key: NormalizedKeyType = normalized_key
 224    
 225    def __call__(self) -> Any:
 226        return self.normalized_key
 227
 228    @staticmethod
 229    def from_key(key: KeyType):
 230        return NormalizedCompoundKey(normalize_compound_key(key))
 231
 232
 233NCK = NormalizedCompoundKey
 234InputKeyType = Union[NormalizedCompoundKey, KeyType]
 235
 236
 237def is_normalized_compound_key(key: KeyType) -> bool:
 238    if isinstance(key, (bytes, str, int, float)):
 239        return True
 240    elif isinstance(key, tuple):
 241        return all(is_normalized_compound_key(item) for item in key)
 242    else:
 243        return False
 244
 245
 246def normalize_compound_key(key: KeyType) -> NormalizedKeyType:
 247    if isinstance(key, NormalizedCompoundKey):
 248        return key()
 249    elif is_normalized_compound_key(key):
 250        return key
 251    elif isinstance(key, list):
 252        need_to_sort: bool = False
 253    elif isinstance(key, (set, frozenset)):
 254        need_to_sort = True
 255    elif isinstance(key, dict):
 256        key = tuple(key.items())
 257        need_to_sort = True
 258    else:
 259        raise WrongKeyTypeError(f'Wrong key type: {type(key)}: {key}')
 260
 261    new_key = list()
 262    for item in key:
 263        new_key.append(normalize_compound_key(item))
 264    
 265    if need_to_sort:
 266        new_key.sort()
 267    
 268    key = tuple(new_key)
 269
 270    return key
 271
 272
 273class DbRequest(ServiceRequest):
 274    def __init__(self, env_id: EnvId = None, db_id: DbId = None, needs_sync: bool = False, can_wait: bool = False):
 275        super().__init__()
 276        self.env_id: EnvId = env_id
 277        self.db_id: DbId = db_id
 278        self.needs_sync: bool = needs_sync
 279        self.provide_to_request_handler = True
 280        self.can_wait: bool = can_wait  # TODO: implement. If True then request can wait for a next iteration in attempt to create a bunch of requests. If False then request will be processed immediately.
 281    
 282    def _copy(self) -> 'DbRequest':
 283        return DbRequest(self.env_id, self.db_id, self.needs_sync)
 284    
 285    def set_root_path_to_db_environments(self, root_path_to_db_environments: str) -> bool:
 286        return self._save_to_copy(0, root_path_to_db_environments)
 287    
 288    def open_databases(self, db_ids: Set[DbId], *args, **kwargs) -> None:
 289        return self._save_to_copy(1, db_ids, *args, **kwargs)
 290    
 291    def drop_db(self, db_id: DbId, delete: bool = False) -> None:
 292        return self._save_to_copy(2, db_id, delete)
 293    
 294    def sync(self) -> None:
 295        return self._save_to_copy(3)
 296    
 297    def wait_sync(self) -> None:  # TODO: implement
 298        return self._save_to_copy(22)
 299    
 300    def set_sync_timeout(self, timeout: RationalNumber) -> None:  # TODO: implement
 301        return self._save_to_copy(23, timeout)
 302    
 303    def get(self, key: InputKeyType) -> ValueType:
 304        return self._save_to_copy(4, key)
 305    
 306    def get_first(self) -> Tuple[NormalizedKeyType, ValueType]:
 307        # Returns first item in DB
 308        return self._save_to_copy(14)
 309    
 310    def get_last(self) -> Tuple[NormalizedKeyType, ValueType]:
 311        # Returns last item in DB
 312        return self._save_to_copy(15)
 313    
 314    def get_items(self, db_keys: Sequence[InputKeyType]) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:
 315        return self._save_to_copy(5, db_keys)
 316    
 317    def get_n_items(self, desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:
 318        return self._save_to_copy(16, desired_key, num, reverse=False)
 319    
 320    def get_reverse_n_items(self, desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:
 321        return self._save_to_copy(16, desired_key, num, reverse=True)
 322    
 323    def get_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:
 324        return self._save_to_copy(17, first_desired_key, last_desired_key, num, reverse=False)
 325    
 326    def get_reverse_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:
 327        return self._save_to_copy(17, first_desired_key, last_desired_key, num, reverse=True)
 328    
 329    def get_all_items(self) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:
 330        # Returns all items from DB
 331        return self._save_to_copy(6)
 332    
 333    def pop(self, key: InputKeyType) -> ValueType:  # TODO: implement
 334        return self._save_to_copy(24, key)
 335    
 336    def pop_items(self, db_keys: Sequence[InputKeyType]) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:  # TODO: implement
 337        return self._save_to_copy(25, db_keys)
 338    
 339    def put(self, key: InputKeyType, value: Optional[Union[ValueType, ValueInfo]] = None) -> RawValueType:
 340        return self._save_to_copy(7, key, value)
 341    
 342    def put_items(self, db_items: Dict[InputKeyType, Union[ValueType, ValueInfo]]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]:
 343        return self._save_to_copy(8, db_items)
 344    
 345    def increment(self, key: InputKeyType, value: RationalNumber = 1) -> RationalNumber:  # TODO: implement
 346        return self._save_to_copy(19, key, value)
 347    
 348    inc = increment
 349    
 350    def increment_items(self, db_items: Dict[InputKeyType, Union[RationalNumber, ValueInfo]]) -> OrderedDictType[NormalizedKeyType, Tuple[RationalNumber, Optional[Exception]]]:  # TODO: implement
 351        return self._save_to_copy(8, db_items)
 352    
 353    inc_items = increment_items
 354    
 355    def replace(self, key: InputKeyType, value: Optional[Union[ValueType, ValueInfo]] = None) -> Tuple[RawValueType, ValueType]:  # TODO: implement
 356        return self._save_to_copy(20, key, value)
 357    
 358    def replace_items(self, db_items: Dict[InputKeyType, Union[ValueType, ValueInfo]]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, ValueType, Optional[Exception]]]]:  # TODO: implement
 359        return self._save_to_copy(21, db_items)
 360    
 361    def delete(self, key: InputKeyType) -> RawValueType:  # TODO: finish an implementation
 362        return self._save_to_copy(9, key)
 363    
 364    def delete_kv(self, key: InputKeyType, value: ValueType) -> RawValueType:  # TODO: finish an implementation
 365        return self._save_to_copy(10, key, value)
 366    
 367    def delete_items(self, db_items: Set[InputKeyType]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]:  # TODO: finish an implementation
 368        return self._save_to_copy(11, db_items)
 369    
 370    def delete_kv_items(self, db_items: Dict[InputKeyType, Tuple[ValueType]]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]:  # TODO: finish an implementation
 371        return self._save_to_copy(12, db_items)
 372    
 373    def execute_in_transaction(self, env_id: EnvId, callable_or_coro: Union[Callable, AnyWorker]) -> None:  # TODO: implement
 374        # Will execute given callable or coroutine in transaction (after all other queues will be processed)
 375        # It will run in current thread. So, it will block current thread until it will be finished if it is not a coroutine.
 376        # On the other hand it will lock environment and all databases in it until coroutine will be finished.
 377        return self._save_to_copy(29, env_id, callable_or_coro)
 378
 379    def open_db_environment(self, env_id: EnvId, env_path: Union[None, str], can_be_written_externally: bool, *args, **kwargs) -> EnvId:
 380        return self._save_to_copy(13, env_id, env_path, can_be_written_externally, *args, **kwargs)
 381    
 382    def close_db_environment(self, env_id: EnvId) -> None:
 383        return self._save_to_copy(18, env_id)
 384    
 385    def lock_databases(self, db_names: Optional[Set[DbId]] = None) -> None:  # TODO: implement
 386        # Lock all databases if db_names is None. Databases will be released automatically wnen coroutine execution will be finished
 387        return self._save_to_copy(26, db_names)
 388    
 389    def try_lock_databases(self, db_names: Optional[Set[DbId]] = None) -> bool:  # TODO: implement
 390        # Tries to lock all databases if db_names is None. Returns True if try was successfull. False otherwise. Databases will be released automatically wnen coroutine execution will be finished
 391        return self._save_to_copy(27, db_names)
 392    
 393    def unlock_databases(self, db_names: Optional[Set[DbId]] = None) -> None:  # TODO: implement
 394        # Unlock all databases if db_names is None
 395        return self._save_to_copy(28, db_names)
 396
 397
 398# class TransactionContextManager(DbRequest):
 399#     def __init__(self, *args, **kwargs):
 400#         super().__init__(*args, **kwargs)
 401#         self._journal_db_id: DbName = b'__journal__'
 402#         self._transaction: Optional[lmdb.Transaction] = None
 403#         self._transaction_context_manager: Optional[lmdb.Transaction] = None
 404#         self._transaction_history: List[DbRequest] = list()
 405#         self._committed: bool = False
 406
 407#     @staticmethod
 408#     def from_request(request: DbRequest) -> 'TransactionContextManager':
 409#         return TransactionContextManager(request.env_id, request.db_id)
 410    
 411#     def to_request(self) -> DbRequest:
 412#         return DbRequest(self.env_id, self.db_id)
 413    
 414#     @staticmethod
 415#     def request_to_data(request: DbRequest) -> Dict:
 416#         result = dict()
 417#         result['env_id'] = request.env_id
 418#         result['db_id'] = request.db_id
 419#         result['needs_sync'] = request.needs_sync
 420#         result['request_type'] = request.request_type
 421#         result['args'] = request.args
 422#         result['kwargs'] = request.kwargs
 423#         result['provide_to_request_handler'] = request.provide_to_request_handler
 424#         return result
 425    
 426#     @staticmethod
 427#     def request_from_data(data: Dict) -> DbRequest:
 428#         request = DbRequest(data['env_id'], data['db_id'], data['needs_sync'])
 429#         request.request_type = data['request_type']
 430#         request.args = data['args']
 431#         request.kwargs = data['kwargs']
 432#         request.provide_to_request_handler = data['provide_to_request_handler']
 433#         return request
 434    
 435#     async def _commit(self):
 436#         i: Interface = current_interface()
 437#         try:
 438#             await i(DbRequest(self.env_id, self._journal_db_id, True).lock_databases({self._journal_db_id}))
 439#             last_committed_id: int = await i(DbRequest(self.env_id, self._journal_db_id, True).get(1))
 440#             last_id: int = await i(DbRequest(self.env_id, self._journal_db_id, True).get(0))
 441#             our_id = last_id
 442#             for request in self._transaction_history:
 443#                 our_id += 1
 444#                 await i(DbRequest(self.env_id, self._journal_db_id, True).put(our_id, TransactionContextManager.request_to_data(request)))
 445
 446#             last_id: int = await i(DbRequest(self.env_id, self._journal_db_id, True).replace(0, our_id))
 447#             for request in self._transaction_history:
 448#                 await i(request)
 449#         except:
 450#             ...
 451#         finally:
 452#             await i(DbRequest(self.env_id, self._journal_db_id, True).unlock_databases({self._journal_db_id}))
 453    
 454#     def _save_to_copy(self, __request__type__: int, *args, **kwargs) -> ServiceRequest:
 455#         request: DbRequest = super()._save_to_copy(__request__type__, *args, **kwargs)
 456#         self._transaction_history.append(request)
 457#         return request
 458    
 459#     def __enter__(self) -> 'TransactionContextManager':
 460#         self._transaction = self.env.begin(write=True)
 461#         self._transaction_context_manager = self._transaction.__enter__()
 462#         return self
 463    
 464#     def __exit__(self, exc_type, exc_val, exc_tb) -> None:
 465#         self._transaction_context_manager.__exit__(exc_type, exc_val, exc_tb)
 466#         self._transaction = None
 467#         self._transaction_context_manager = None
 468    
 469#     async def __aenter__(self) -> 'TransactionContextManager':
 470#         i: Interface = current_interface()
 471#         db_service: Db = i._loop.get_service_instance(Db)
 472#         if self.env_id not in db_service.db_environments:
 473#             raise UnknownEnvError(self.env_id)
 474        
 475#         env_info: EnvInfo = self.db_environments[self.env_id]
 476#         if self._journal_db_id not in env_info.databases:
 477#             await i(self.to_request().open_databases({self._journal_db_id}))
 478
 479#         return self
 480    
 481#     async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
 482#         if (exc_type is None) and (exc_val is None) and (exc_tb is None):
 483#             await self._commit()
 484
 485
 486def check_request(method: Callable):
 487    def wrapper(self: 'Db', request: DbRequest, *args, **kwargs) -> ServiceProcessingResponse:
 488        if request.env_id not in self.db_environments:
 489            return True, None, UnknownEnvError(request.env_id)
 490        
 491        if request.db_id not in self.db_environments[request.env_id].databases:
 492            return True, None, UnknownEnvDBError(request.env_id, request.db_id)
 493        
 494        return method(self, request, *args, **kwargs)
 495    
 496    original_func_sign: inspect.Signature = inspect.signature(method)
 497    update_wrapper(wrapper, method)
 498    wrapper.__signature__ = original_func_sign.replace(parameters=tuple(original_func_sign.parameters.values()), return_annotation=original_func_sign.return_annotation)
 499    return wrapper
 500    
 501
 502class Db(Service, EntityStatsMixin):
 503    def __init__(self, loop: CoroSchedulerType):
 504        super(Db, self).__init__(loop)
 505        self.default_env_name: str = '__default__.dbenv'
 506        self.default_envs_dir: str = 'db_envs'
 507        # self.drop_db_requests: Dict[CoroID, Tuple[Hashable, bool]] = dict()
 508        # self.drop_db_requests: Dict[EnvId, Dict[CoroID, Tuple[DbId, bool]]] = dict()
 509        self.drop_db_requests: Dict[EnvId, Dict[DbId, Tuple[bool, Set[CoroID]]]] = dict()
 510        # self.read_queue: List[Tuple[CoroID, Tuple[RawKeyType, DbId]]] = list()
 511        self.read_queue: Dict[EnvId, Dict[DbId, Dict[RawKeyType, Set[CoroID]]]] = dict()
 512        self.massive_read_queue: Dict[EnvId, Dict[CoroID, Dict[DbId, List[RawKeyType]]]] = dict()
 513        # self.massive_read_queue: List[Tuple[CoroID, Set[Tuple[KeyType, DbId]]]] = list()
 514        self.put_queue: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]] = dict()
 515        self.data_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]] = dict()
 516        # self.data_cache: Dict[EnvId, Dict[Tuple[RawKeyType, DbId], RawValueType]] = dict()
 517        self.max_data_cache_size: Union[None, int] = 10000
 518        self.deletion_cache: Dict[EnvId, Dict[DbId, Set[RawKeyType]]] = dict()
 519        self.kv_deletion_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[RawValueType]]]] = dict()
 520        # self.kv_deletion_cache: Dict[Tuple[Hashable, Hashable], Any] = dict()
 521        self.get_first_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict()
 522        self.get_last_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict()
 523        self.get_n_items_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, int, bool]]] = dict()
 524        self.get_items_range_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, RawKeyType, int, bool]]] = dict()
 525        # self.get_all_items_queue: List[Tuple[CoroID, DbId, EnvId]] = list()
 526        self.get_all_items_queue: Dict[EnvId, List[Tuple[CoroID, DbId]]] = dict()
 527        self.open_db_environment_requests: Dict[CoroID, Tuple[EnvId, Union[None, str], bool, Tuple, Dict]] = dict()
 528        self.root_path_to_db_environments_rel: RelativePath = None
 529        self.app_name_waiter: CoroWrapperBase = None
 530        self.default_db_environment: lmdb.Environment = None
 531        self.db_environments: Dict[EnvId, EnvInfo] = dict()
 532        # self.databases: Dict[Hashable, Any] = dict()
 533        # self.db_names: Dict[DbId, DbName] = dict()
 534        self.async_loop = None
 535        self.sync_time_interval = 1.0
 536        self.last_sync_time = perf_counter()
 537        self.envs_need_to_be_sync: Set[DbId] = set()
 538        self.envs_in_sync: Set[DbId] = set()
 539        self.force_sync: Set[EnvId] = set()
 540        self.sync_an_each_write: bool = False
 541        self.write_locked: Set[EnvId] = set()
 542        self.writes_num: int = 0
 543        self.reads_num: int = 0
 544        self.deletes_num: int = 0
 545        self.db_drops_num: int = 0
 546        self.write_locked_coro_id: Set[CoroID] = set()
 547        self.wake_up_handle_registered: bool = False
 548        # self.serializer = best_serializer_for_standard_data((
 549        #                                     DataFormats.binary,
 550        #                                     DataFormats.messagepack,
 551        #                                     Tags.can_use_bytes,
 552        #                                     Tags.decode_str_as_str,
 553        #                                     Tags.decode_list_as_list,
 554        #                                     Tags.decode_bytes_as_bytes,
 555        #                                     Tags.superficial,
 556        #                                     Tags.current_platform,
 557        #                                     Tags.multi_platform,
 558        #                                 ),
 559        #                                 TestDataType.small,
 560        #                                 0.1)
 561        self.serializer = Serializer(Serializers.msgspec_messagepack)
 562        self._request_workers = {
 563            0: self._on_set_root_path_to_db_environments,
 564            1: self._on_open_databases,
 565            2: self._on_drop_db,
 566            3: self._on_sync,
 567            4: self._on_get,
 568            5: self._on_get_items,
 569            6: self._on_get_all_items,
 570            7: self._on_put,
 571            8: self._on_put_items,
 572            9: self._on_delete,
 573            10: self._on_delete_kv,
 574            11: self._on_delete_items,
 575            12: self._on_delete_kv_items,
 576            13: self._on_open_db_environment,
 577            14: self._on_get_first,
 578            15: self._on_get_last,
 579            16: self._on_get_n_items,
 580            17: self._on_get_items_range,
 581            18: self._on_close_db_environment,
 582        }
 583
 584    # TODO: sync with last implementation
 585    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
 586        return type(self).__name__, {
 587            'db_env_ids': list(self.db_environments.keys()),
 588            'writes num': self.writes_num,
 589            'reads num': self.reads_num,
 590            'deletes num': self.deletes_num,
 591            'db drops num': self.db_drops_num,
 592        }
 593    
 594    def norm_key(self, key: InputKeyType) -> NormalizedKeyType:
 595        return normalize_compound_key(key)
 596
 597    def raw_key(self, env_or_id: Union[lmdb.Environment, EnvId], key: InputKeyType) -> RawKeyType:
 598        raw_key: bytes = self.serializer.dumps(normalize_compound_key(key))
 599        if isinstance(env_or_id, lmdb.Environment):
 600            env = env_or_id
 601        else:
 602            env = self.db_environments[env_or_id].env
 603        
 604        if len(raw_key) > env.max_key_size():
 605            raise RawKeyIsTooLargeError(f'Raw form ({raw_key=}) of the key ({key=}) is too large: {len(raw_key)} > {env.max_key_size()}')
 606
 607    def destroy(self):
 608        # TODO: we need to use some loop destroy service in order to put our coro which will write all pending queues,
 609        # sync envirounments and close them. Also we need to prevent new requests from being processed.
 610        db_environments_values = self.db_environments.values()
 611        self.db_environments = type(self.db_environments)()
 612        self.default_db_environment = None
 613        for env_info in db_environments_values:
 614            env_info.close()
 615
 616    def single_task_registration_or_immediate_processing(
 617            self, *args, **kwargs) -> ServiceProcessingResponse:
 618        result = self.try_resolve_request(*args, **kwargs)
 619        if result is None:
 620            self._ensure_default_db_environment()
 621            return True, None, None
 622        else:
 623            return result
 624
 625    def _ensure_default_db_environment(self) -> bool:
 626        if self.default_db_environment is None:
 627            if self.root_path_to_db_environments_rel is None:
 628                if self.app_name_waiter is None:
 629                    async def coro(i: Interface, self: 'Db'):
 630                        app_name_for_fs = await i(InstanceRequest().wait('app_name_for_fs'))
 631                        app_data_dir_path_type: AppDirectoryType = await i(InstanceRequest().wait('app_data_dir_path_type'))
 632                        app_dir_path: AppDirPath = await i(InstanceRequest().wait(AppDirPath))
 633                        app_data_dir_path: str = app_dir_path.cached(app_data_dir_path_type, app_name_for_fs)
 634                        self.root_path_to_db_environments_rel = RelativePath(RelativePath(app_data_dir_path)(self.default_envs_dir))
 635                        self._init_default_db_env()
 636                        self.app_name_waiter = None
 637                        self.make_live()
 638                    
 639                    self.app_name_waiter = put_root_from_other_service(self, coro, self)
 640                    # self.app_name_waiter.is_background_coro = True
 641                
 642                self.make_dead()
 643                return False
 644            else:
 645                self._init_default_db_env()
 646                return True
 647        else:
 648            return True
 649
 650    def full_processing_iteration(self):
 651        # TODO: combine all queues into one single transaction by env_id. Or at most two transactions: read and write.
 652        # This will improve performance and will give ability to move transations to other threads or even processess if needed.
 653
 654        # TODO: since DB can not handle big number of transactions per secons, it is better to combine all requests and process 
 655        # them at most as once a milisecond (it is frequently engough).
 656
 657        if not self._ensure_default_db_environment():
 658            return
 659        
 660        if self.force_sync:
 661            self.envs_need_to_be_sync |= self.force_sync
 662            self.force_sync = set()
 663        
 664        put_queue_buff = self.put_queue
 665        self.put_queue = type(put_queue_buff)()
 666        
 667        data_cache_buff = self.data_cache  # will be cleared at the end of the iteration if necessary
 668        
 669        read_queue_buff = self.read_queue
 670        self.read_queue = type(read_queue_buff)()
 671        
 672        massive_read_queue_buff = self.massive_read_queue
 673        self.massive_read_queue = type(massive_read_queue_buff)()
 674        
 675        deletion_cache_buff = self.deletion_cache
 676        self.deletion_cache = type(deletion_cache_buff)()
 677        
 678        kv_deletion_cache_buff = self.kv_deletion_cache
 679        self.kv_deletion_cache = type(kv_deletion_cache_buff)()
 680        
 681        get_all_items_queue_buff = self.get_all_items_queue
 682        self.get_all_items_queue = type(get_all_items_queue_buff)()
 683
 684        get_first_queue_buff = self.get_first_queue
 685        self.get_first_queue = type(get_first_queue_buff)()
 686
 687        get_last_queue_buff = self.get_last_queue
 688        self.get_last_queue = type(get_last_queue_buff)()
 689
 690        get_n_items_queue_buff = self.get_n_items_queue
 691        self.get_n_items_queue = type(get_n_items_queue_buff)()
 692
 693        get_items_range_queue_buff = self.get_items_range_queue
 694        self.get_items_range_queue = type(get_items_range_queue_buff)()
 695
 696        open_db_environment_requests_buff: Dict[CoroID, Tuple[EnvId, Union[None, str], bool, Tuple, Dict]] = self.open_db_environment_requests
 697        self.open_db_environment_requests = type(open_db_environment_requests_buff)()
 698
 699        # open_db_environment
 700        for coro_id, request_info in open_db_environment_requests_buff.items():
 701            env_id, env_path, can_be_written_externally, args, kwargs = request_info
 702            if env_id in self.db_environments:
 703                self.register_response(coro_id, self.db_environments[env_id])
 704            else:
 705                exception = None
 706                try:
 707                    result = self._init_db_env(env_id, env_path, can_be_written_externally, *args, **kwargs)
 708                except:
 709                    exception = get_exception()
 710
 711                self.register_response(coro_id, result, exception)
 712
 713        # put
 714        def put_handler(env_info: EnvInfo, put_info: Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]):
 715            try:
 716                with env_info.env.begin(write=True) as txn:
 717                    for db_id, db_put_info in put_info.items():
 718                        if db_id in env_info.databases:
 719                            for raw_key, values in db_put_info.items():
 720                                for value in values:
 721                                    if isinstance(value, RawValueInfo):
 722                                        value, dupdata, overwrite, append = value
 723                                    else:
 724                                        dupdata, overwrite, append = True, True, False
 725                                    
 726                                    txn.put(raw_key, value, db=env_info.databases[db_id], dupdata=dupdata, overwrite=overwrite, append=append)
 727                        
 728                        self.writes_num += len(db_put_info)
 729            except lmdb.MapFullError:
 730                raise DBError.from_exception(db_id)
 731        
 732        for env_id, put_info in put_queue_buff.items():
 733            if env_id in self.db_environments:
 734                self.envs_need_to_be_sync.add(env_id)
 735                lmdb_reapplier(self.db_environments[env_id], put_handler, put_info)
 736        
 737        # TODO: implement replace* methods processing
 738
 739        # delete
 740        for env_id, deletion_cache_buff_db_info in deletion_cache_buff.items():
 741            if env_id in self.db_environments:
 742                env_info = self.db_environments[env_id]
 743                self.envs_need_to_be_sync.add(env_id)
 744                with env_info.env.begin(write=True) as txn:
 745                    for db_id, del_keys in deletion_cache_buff_db_info.items():
 746                        if db_id in env_info.databases:
 747                            for del_raw_key in del_keys:
 748                                txn.delete(del_raw_key, db=env_info.databases[db_id])
 749                                self.deletes_num += 1
 750                                try:
 751                                    data_cache_buff[env_id][db_id].pop(del_raw_key, None)
 752                                except KeyError:
 753                                    pass
 754
 755        # delete_kv
 756        for env_id, kv_deletion_cache_buff_db_info in kv_deletion_cache_buff.items():
 757            if env_id in self.db_environments:
 758                env_info = self.db_environments[env_id]
 759                self.envs_need_to_be_sync.add(env_id)
 760                with env_info.env.begin(write=True) as txn:
 761                    for db_id, del_keys in kv_deletion_cache_buff_db_info.items():
 762                        if db_id in env_info.databases:
 763                            for del_raw_key, del_raw_values in del_keys.items():
 764                                for del_raw_value in del_raw_values:
 765                                    txn.delete(del_raw_key, del_raw_value, db=env_info.databases[db_id])
 766                                    self.deletes_num += 1
 767                                    try:
 768                                        raw_values: List[Union[RawValueType, RawValueInfo]] = data_cache_buff[env_id][db_id][del_raw_key]
 769                                        new_raw_values: List[Union[RawValueType, RawValueInfo]] = list()
 770                                        for raw_value in raw_values:
 771                                            raw_value_original = raw_value
 772                                            if isinstance(raw_value, RawValueInfo):
 773                                                raw_value, _, _, _ = raw_value
 774                                            
 775                                            if raw_value != del_raw_value:
 776                                                new_raw_values.append(raw_value_original)
 777                                            
 778                                        if new_raw_values:
 779                                            data_cache_buff[env_id][db_id][del_raw_key] = new_raw_values
 780                                        else:
 781                                            data_cache_buff[env_id][db_id].pop(del_raw_key, None)
 782                                    except KeyError:
 783                                        pass
 784
 785        # drop
 786        drop_db_requests_buff = self.drop_db_requests
 787        self.drop_db_requests = type(drop_db_requests_buff)()
 788        
 789        def drop_handler(env_info: EnvInfo, drop_info: Dict[DbId, Tuple[bool, Set[CoroID]]]):
 790            for db_id, db_drop_info in drop_info.items():
 791                delete_db, coro_id = db_drop_info
 792                if db_id in env_info.databases:
 793                    try:
 794                        with env_info.env.begin(write=True) as txn:
 795                            txn.drop(db=env_info.databases[db_id], delete=delete_db)
 796                            if delete_db:
 797                                del env_info.databases[db_id]
 798                                del env_info.db_names[db_id]
 799                        
 800                        self.db_drops_num += 1
 801                    except lmdb.MapFullError:
 802                        raise DBError.from_exception(db_id)
 803                    
 804                    self.register_response(coro_id, None, UnknownEnvError(env_id))
 805                else:
 806                    self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 807        
 808        for env_id, drop_info in drop_db_requests_buff.items():
 809            if env_id in self.db_environments:
 810                self.envs_need_to_be_sync.add(env_id)
 811                lmdb_reapplier(self.db_environments[env_id], drop_handler, drop_info)
 812            else:
 813                for db_id, db_drop_info in drop_info.items():
 814                    delete_db, coro_id = db_drop_info
 815                    self.register_response(coro_id, None, UnknownEnvError(env_id))
 816
 817        # get
 818        def get_item(txn, key_info: Tuple[RawKeyType, DbId, EnvId], data_cache_buff: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]]) -> Tuple[ValueType, Optional[Exception]]:
 819            key, db_id, env_id = key_info
 820            need_to_get_from_db = True
 821            try:
 822                values = data_cache_buff[env_id][db_id][key]
 823                if values:
 824                    value = values[0]
 825                    if isinstance(value, RawValueInfo):
 826                        value, _, _, _ = value
 827
 828                    need_to_get_from_db = False
 829            except KeyError:
 830                pass
 831            
 832            if need_to_get_from_db:
 833                value = txn.get(key, db=self.db_environments[env_id].databases[db_id])
 834                self.reads_num += 1
 835            
 836            exception = None
 837            try:
 838                if value is None:
 839                    exception = DbKeyError(key_info)
 840                else:
 841                    value = self.serializer.loads(value)
 842            except:
 843                exception = get_exception()
 844            
 845            return value, exception
 846        
 847        # _on_get
 848        for env_id, read_queue_buff_db_info in read_queue_buff.items():
 849            if env_id in self.db_environments:
 850                env_info = self.db_environments[env_id]
 851                for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items():
 852                    if db_id in env_info.databases:
 853                        with env_info.env.begin() as txn:
 854                            for raw_key, coro_ids in read_queue_buff_db_key_info.items():
 855                                value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff)
 856                                for coro_id in coro_ids:
 857                                    self.register_response(coro_id, value, exception)
 858                    else:
 859                        for raw_key, coro_ids in read_queue_buff_db_key_info.items():
 860                            for coro_id in coro_ids:
 861                                self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 862            else:
 863                for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items():
 864                    for raw_key, coro_ids in read_queue_buff_db_key_info.items():
 865                        for coro_id in coro_ids:
 866                            self.register_response(coro_id, None, UnknownEnvError(env_id))
 867        
 868        # _on_get_items
 869        results: Dict[CoroID, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]]] = dict()
 870        for env_id, massive_read_queue_buff_coro_info in massive_read_queue_buff.items():
 871            if env_id in self.db_environments:
 872                env_info = self.db_environments[env_id]
 873                for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items():
 874                    if coro_id not in results:
 875                        results[coro_id] = dict()
 876                    
 877                    coro_results = results[coro_id]
 878                    for db_id, raw_keys in read_queue_buff_db_info.items():
 879                        if db_id not in coro_results:
 880                            coro_results[db_id] = OrderedDict()
 881                        
 882                        coro_db_results = coro_results[db_id]
 883                        if db_id in env_info.databases:
 884                            with env_info.env.begin() as txn:
 885                                for raw_key in raw_keys:
 886                                    value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff)
 887                                    coro_db_results[normalize_compound_key(raw_key)] = (value, exception)
 888                        else:
 889                            for coro_id in coro_ids:
 890                                self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 891            else:
 892                for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items():
 893                    for db_id, raw_keys in read_queue_buff_db_info.items():
 894                        for coro_id in coro_ids:
 895                            self.register_response(coro_id, None, UnknownEnvError(env_id))
 896        
 897        for coro_id, coro_results in results.items():
 898            if coro_results:
 899                db_id = tuple(coro_results.keys())[0]
 900                self.register_response(coro_id, coro_results[db_id], None)
 901            else:
 902                self.register_response(coro_id, OrderedDict(), None)
 903
 904        # get all items
 905        for env_id, requests_info in get_all_items_queue_buff.items():
 906            for request_info in requests_info:
 907                coro_id, db_id = request_info
 908                if env_id in self.db_environments:
 909                    env_info = self.db_environments[env_id]
 910                    env = env_info.env
 911                    if db_id in env_info.databases:
 912                        db = env_info.databases[db_id]
 913                        with env.begin(db=db) as txn:
 914                            result = list()
 915                            exception = None
 916                            try:
 917                                result = [(normalize_compound_key(self.serializer.loads(k)), self.serializer.loads(v)) for k, v in txn.cursor() if not k.startswith(EnvInfo.db_name_prefix_bytes)]
 918                                self.reads_num += len(result)
 919                            except:
 920                                exception = get_exception()
 921                            
 922                            self.register_response(coro_id, result, exception)
 923                    else:
 924                        self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 925                else:
 926                    self.register_response(coro_id, None, UnknownEnvError(env_id))
 927        
 928        # get_first
 929        for env_id, get_first_queue_buff_db_info in get_first_queue_buff.items():
 930            if env_id in self.db_environments:
 931                env_info = self.db_environments[env_id]
 932                for db_id, coro_ids in get_first_queue_buff_db_info.items():
 933                    if db_id in env_info.databases:
 934                        db = env_info.databases[db_id]
 935                        with env_info.env.begin(db=db) as txn:
 936                            result = None
 937                            exception = None
 938                            try:
 939                                cursor: lmdb.Cursor = txn.cursor()
 940                                if cursor.first():
 941                                    key = cursor.key()
 942                                    key_found: bool = True
 943                                    while key.startswith(EnvInfo.db_name_prefix_bytes):
 944                                        key_found = cursor.next_nodup()
 945                                        if not key_found:
 946                                            break
 947
 948                                        key = cursor.key()
 949
 950                                    if key_found:
 951                                        value = cursor.value()
 952                                        result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value()))
 953                                        self.reads_num += 1
 954                                    else:
 955                                        exception = KeyError()
 956                                else:
 957                                    exception = KeyError()
 958                            except:
 959                                exception = get_exception()
 960                            
 961                            for coro_id in coro_ids:
 962                                self.register_response(coro_id, result, exception)
 963                    else:
 964                        for coro_id in coro_ids:
 965                            self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 966            else:
 967                for db_id, coro_ids in get_first_queue_buff_db_info.items():
 968                    for coro_id in coro_ids:
 969                        self.register_response(coro_id, None, UnknownEnvError(env_id))
 970        
 971        # get_last
 972        for env_id, get_last_queue_buff_db_info in get_last_queue_buff.items():
 973            if env_id in self.db_environments:
 974                env_info = self.db_environments[env_id]
 975                for db_id, coro_ids in get_last_queue_buff_db_info.items():
 976                    if db_id in env_info.databases:
 977                        db = env_info.databases[db_id]
 978                        with env_info.env.begin(db=db) as txn:
 979                            result = None
 980                            exception = None
 981                            try:
 982                                cursor: lmdb.Cursor = txn.cursor()
 983                                if cursor.last():
 984                                    key: bytes = cursor.key()
 985                                    key_found: bool = True
 986                                    key = cursor.key()
 987                                    while key.startswith(EnvInfo.db_name_prefix_bytes):
 988                                        key_found = cursor.prev_nodup()
 989                                        if not key_found:
 990                                            break
 991
 992                                        key = cursor.key()
 993
 994                                    if key_found:
 995                                        value: bytes = cursor.value()
 996                                        result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value()))
 997                                        self.reads_num += 1
 998                                    else:
 999                                        exception = KeyError()
1000                                else:
1001                                    exception = KeyError()
1002                            except:
1003                                exception = get_exception()
1004                            
1005                            for coro_id in coro_ids:
1006                                self.register_response(coro_id, result, exception)
1007                    else:
1008                        for coro_id in coro_ids:
1009                            self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
1010            else:
1011                for db_id, coro_ids in get_last_queue_buff_db_info.items():
1012                    for coro_id in coro_ids:
1013                        self.register_response(coro_id, None, UnknownEnvError(env_id))
1014        
1015        # get_n_items
1016        for env_id, get_n_items_queue_buff_coro_info in get_n_items_queue_buff.items():
1017            if env_id in self.db_environments:
1018                env_info = self.db_environments[env_id]
1019                for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items():
1020                    coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list()
1021                    db_id, first_desired_raw_key, num_items, reverse = read_queue_buff_db_info
1022                    if db_id in env_info.databases:
1023                        db = env_info.databases[db_id]
1024                        with env_info.env.begin(db=db) as txn:
1025                            exception = None
1026                            try:
1027                                cursor: lmdb.Cursor = txn.cursor()
1028                                if cursor.set_range(first_desired_raw_key):
1029                                    if reverse:
1030                                        cursor_iterator = cursor.iterprev(keys=True, values=True)
1031                                    else:
1032                                        cursor_iterator = cursor.iternext(keys=True, values=True)
1033                                    
1034                                    for raw_key, raw_value in cursor_iterator:
1035                                        if raw_key.startswith(EnvInfo.db_name_prefix_bytes):
1036                                            continue
1037
1038                                        if (num_items is not None) and (num_items <= 0):
1039                                            break
1040                                        
1041                                        coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value)))
1042                                        self.reads_num += 1
1043                                        if num_items is not None:
1044                                            num_items -= 1
1045                                else:
1046                                    exception = KeyError()
1047                            except:
1048                                exception = get_exception()
1049                            
1050                            self.register_response(coro_id, coro_results, exception)
1051                    else:
1052                        self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
1053            else:
1054                for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items():
1055                    for coro_id in coro_ids:
1056                        self.register_response(coro_id, None, UnknownEnvError(env_id))
1057        
1058        # get_items_range
1059        for env_id, get_items_range_queue_buff_coro_info in get_items_range_queue_buff.items():
1060            if env_id in self.db_environments:
1061                env_info = self.db_environments[env_id]
1062                for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items():
1063                    coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list()
1064                    db_id, first_desired_raw_key, last_desired_raw_key, num_items, reverse = read_queue_buff_db_info
1065                    if db_id in env_info.databases:
1066                        db = env_info.databases[db_id]
1067                        with env_info.env.begin(db=db) as txn:
1068                            exception = None
1069                            try:
1070                                cursor: lmdb.Cursor = txn.cursor()
1071                                if cursor.set_range(first_desired_raw_key):
1072                                    if reverse:
1073                                        cursor_iterator = cursor.iterprev(keys=True, values=True)
1074                                    else:
1075                                        cursor_iterator = cursor.iternext(keys=True, values=True)
1076                                    
1077                                    for raw_key, raw_value in cursor_iterator:
1078                                        if raw_key.startswith(EnvInfo.db_name_prefix_bytes):
1079                                            continue
1080                                        
1081                                        if reverse:
1082                                            if raw_key < last_desired_raw_key:
1083                                                break
1084                                        else:
1085                                            if raw_key > last_desired_raw_key:
1086                                                break
1087
1088                                        if (num_items is not None) and (num_items <= 0):
1089                                            break
1090
1091                                        coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value)))
1092                                        self.reads_num += 1
1093                                        if num_items is not None:
1094                                            num_items -= 1
1095                                else:
1096                                    exception = KeyError()
1097                            except:
1098                                exception = get_exception()
1099                            
1100                            self.register_response(coro_id, coro_results, exception)
1101                    else:
1102                        self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
1103            else:
1104                for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items():
1105                    for coro_id in coro_ids:
1106                        self.register_response(coro_id, None, UnknownEnvError(env_id))
1107        
1108        need_to_sync = self.sync_an_each_write
1109
1110        # periodic sync
1111        if (perf_counter() - self.last_sync_time) >= self.sync_time_interval:
1112            for env_id, env_info in self.db_environments.items():
1113                self.envs_need_to_be_sync.add(env_id)
1114                need_to_sync = True
1115
1116        # sync
1117        if need_to_sync and self.envs_need_to_be_sync:
1118            envs_need_to_be_sync_bak = self.envs_need_to_be_sync - self.envs_in_sync
1119            self.envs_need_to_be_sync = set(self.envs_in_sync)
1120            for env_id in envs_need_to_be_sync_bak:
1121                self.sync_in_thread_pool(env_id)
1122            
1123            self.last_sync_time = perf_counter()
1124        
1125        # invalidate data_cache for envs that can be written externally
1126        for env_id, env_info in self.db_environments.items():
1127            if env_info.init_info.can_be_written_externally:
1128                self.data_cache.pop(env_id, None)
1129        
1130        # clear too big caches
1131        for env_id, env_info in self.db_environments.items():
1132            if env_id in self.data_cache:
1133                if len(self.data_cache[env_id]) > self.max_data_cache_size:
1134                    del self.data_cache[env_id]
1135
1136        self.make_dead()
1137
1138    def in_work(self) -> bool:
1139        result: bool = bool(self.default_db_environment is None) \
1140                            or bool(self.open_db_environment_requests) \
1141                            or bool(self.get_first_queue) \
1142                            or bool(self.get_last_queue) \
1143                            or bool(self.get_n_items_queue) \
1144                            or bool(self.get_items_range_queue) \
1145                            or bool(self.read_queue) \
1146                            or bool(self.massive_read_queue) \
1147                            or bool(self.get_all_items_queue)\
1148                            or bool(self.force_sync) \
1149                            or bool(self.deletion_cache) \
1150                            or bool(self.kv_deletion_cache) \
1151                            or bool(self.drop_db_requests) \
1152                            or bool(self.put_queue) \
1153                            or bool(self.envs_need_to_be_sync) \
1154                            or ((perf_counter() - self.last_sync_time) >= self.sync_time_interval)
1155        return self.thrifty_in_work(result)
1156    
1157    def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]:
1158        time_since_last_sync_time: float = perf_counter() - self.last_sync_time
1159        if self.sync_time_interval > time_since_last_sync_time:
1160            return True, self.sync_time_interval - time_since_last_sync_time
1161        else:
1162            return True, 0
1163
1164    def _init_db_env(self, env_id: EnvId, env_path: Union[None, str], can_be_written_externally: bool, *args, **kwargs) -> EnvInfo:
1165        if env_id in self.db_environments:
1166            return self.db_environments[env_id]
1167
1168        if env_path is None:
1169            if env_id is None:
1170                env_path = self.root_path_to_db_environments_rel(self.default_env_name)
1171            else:
1172                env_path = self.root_path_to_db_environments_rel(to_identifier(f'{env_id}'))
1173        else:
1174            if os.path.isabs(env_path):
1175                env_path = os.path.normpath(env_path)
1176            else:
1177                env_path = self.root_path_to_db_environments_rel(env_path)
1178
1179        env_init_info: EnvInitInfo = EnvInitInfo(
1180            env_id, 
1181            env_path, 
1182            can_be_written_externally, 
1183            *args, **kwargs
1184        )
1185        env_info: EnvInfo = EnvInfo(env_init_info)
1186        self.db_environments[env_info.env_id] = env_info
1187        env_info.env.sync(True)
1188        return env_info
1189
1190    def _init_default_db_env(self):
1191        args, kwargs = default_env_path_and_params()
1192        env_info: EnvInfo = self._init_db_env(None, *args, **kwargs)
1193        self.default_db_environment = env_info
1194
1195    def _on_open_db_environment(self, request: DbRequest, env_id: EnvId, env_path: Union[None, str], can_be_written_externally: bool, *args, **kwargs) -> Tuple[bool, EnvInfo, Exception]:
1196        self.open_db_environment_requests[self.current_caller_coro_info.coro_id] = (env_id, env_path, can_be_written_externally, args, kwargs)
1197        self.make_live()
1198        return False, None, None
1199    
1200    @check_request
1201    def _on_close_db_environment(self, request: DbRequest):
1202        env_id: EnvId = request.env_id
1203        
1204        if env_id is None:
1205            return True, False, DefaultDbEnvironmentCanNotBeClosedManualyError()
1206        
1207        if env_id in self.db_environments:
1208            self.db_environments[env_id].close()
1209            del self.db_environments[env_id]
1210            return True, True, None
1211        else:
1212            return True, False, UnknownEnvError(env_id)
1213
1214    def _on_set_root_path_to_db_environments(self, request: DbRequest, root_path_to_db_environments: str) -> ServiceProcessingResponse:
1215        self.root_path_to_db_environments_rel = RelativePath(root_path_to_db_environments)
1216        return True, True, None
1217    
1218    @check_request
1219    def _on_sync(self, request: DbRequest) -> ServiceProcessingResponse:
1220        if self.put_queue:
1221            self.force_sync.add(request.env_id)
1222            self.make_live()
1223        else:
1224            # self.default_db_environment.sync(True)
1225            self.sync_in_thread_pool(request.env_id)
1226        
1227        return True, None, None
1228    
1229    @check_request
1230    def _on_get(self, request: DbRequest, key: KeyType) -> ServiceProcessingResponse:
1231        coro_id = self.current_caller_coro_info.coro_id
1232        key = self.serializer.dumps(normalize_compound_key(key))
1233        db_id = request.db_id
1234        env_id = request.env_id
1235        
1236        if env_id in self.data_cache:
1237            env_cache = self.data_cache[env_id]
1238            if db_id in env_cache:
1239                db_cache = env_cache[db_id]
1240                if key in db_cache:
1241                    values = db_cache[key]
1242                    if values:
1243                        value = values[0]
1244                        if isinstance(value, RawValueInfo):
1245                            value, _, _, _ = value
1246                        
1247                        value = self.serializer.loads(value)
1248                        return True, value, None
1249        
1250        if env_id not in self.read_queue:
1251            self.read_queue[env_id] = dict()
1252        
1253        read_queue_env = self.read_queue[env_id]
1254        if db_id not in read_queue_env:
1255            read_queue_env[db_id] = dict()
1256        
1257        read_queue_env_db = read_queue_env[db_id]
1258        if key not in read_queue_env_db:
1259            read_queue_env_db[key] = set()
1260        
1261        read_queue_env_db_key = read_queue_env_db[key]
1262        read_queue_env_db_key.add(coro_id)
1263        self.make_live()
1264        return False, None, None
1265    
1266    @check_request
1267    def _on_get_first(self, request: DbRequest) -> ServiceProcessingResponse:
1268        coro_id = self.current_caller_coro_info.coro_id
1269        db_id = request.db_id
1270        env_id = request.env_id
1271        
1272        if env_id not in self.get_first_queue:
1273            self.get_first_queue[env_id] = dict()
1274        
1275        get_first_queue_env = self.get_first_queue[env_id]
1276        if db_id not in get_first_queue_env:
1277            get_first_queue_env[db_id] = set()
1278        
1279        get_first_queue_env_db = get_first_queue_env[db_id]
1280        get_first_queue_env_db.add(coro_id)
1281        
1282        self.make_live()
1283        return False, None, None
1284    
1285    @check_request
1286    def _on_get_last(self, request: DbRequest) -> ServiceProcessingResponse:
1287        coro_id = self.current_caller_coro_info.coro_id
1288        db_id = request.db_id
1289        env_id = request.env_id
1290        
1291        if env_id not in self.get_last_queue:
1292            self.get_last_queue[env_id] = dict()
1293        
1294        get_last_queue_env = self.get_last_queue[env_id]
1295        if db_id not in get_last_queue_env:
1296            get_last_queue_env[db_id] = set()
1297        
1298        get_last_queue_env_db = get_last_queue_env[db_id]
1299        get_last_queue_env_db.add(coro_id)
1300        
1301        self.make_live()
1302        return False, None, None
1303    
1304    @check_request
1305    def _on_get_n_items(self, request: DbRequest, desired_key: InputKeyType, num: Optional[int] = None, reverse: bool = False) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]:
1306        coro_id = self.current_caller_coro_info.coro_id
1307        desired_key = self.serializer.dumps(normalize_compound_key(desired_key))
1308        db_id = request.db_id
1309        env_id = request.env_id
1310
1311        if env_id not in self.get_n_items_queue:
1312            self.get_n_items_queue[env_id] = dict()
1313        
1314        get_n_items_queue_env = self.get_n_items_queue[env_id]
1315        get_n_items_queue_env[coro_id] = (db_id, desired_key, num, reverse)
1316        self.make_live()
1317        return False, None, None
1318    
1319    @check_request
1320    def _on_get_items_range(self, request: DbRequest, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None, reverse: bool = False) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]:
1321        coro_id = self.current_caller_coro_info.coro_id
1322        first_desired_key = self.serializer.dumps(normalize_compound_key(first_desired_key))
1323        last_desired_key = self.serializer.dumps(normalize_compound_key(last_desired_key))
1324        db_id = request.db_id
1325        env_id = request.env_id
1326
1327        if env_id not in self.get_items_range_queue:
1328            self.get_items_range_queue[env_id] = dict()
1329        
1330        get_items_range_queue_env = self.get_items_range_queue[env_id]
1331        get_items_range_queue_env[coro_id] = (db_id, first_desired_key, last_desired_key, num, reverse)
1332        self.make_live()
1333        return False, None, None
1334    
1335    @check_request
1336    def _on_get_items(self, request: DbRequest, db_keys: Sequence[InputKeyType]) -> Tuple[bool, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]], Optional[Exception]]:
1337        coro_id = self.current_caller_coro_info.coro_id
1338        db_id = request.db_id
1339        env_id = request.env_id
1340
1341        if env_id not in self.massive_read_queue:
1342            self.massive_read_queue[env_id] = dict()
1343        
1344        massive_read_queue_env = self.massive_read_queue[env_id]
1345        if coro_id not in massive_read_queue_env:
1346            massive_read_queue_env[coro_id] = dict()
1347        
1348        massive_read_queue_env_coro = massive_read_queue_env[coro_id]
1349        if db_id not in massive_read_queue_env_coro:
1350            massive_read_queue_env_coro[db_id] = list()
1351        
1352        massive_read_queue_env_coro_db = massive_read_queue_env_coro[db_id]
1353        for key in db_keys:
1354            massive_read_queue_env_coro_db.append(self.serializer.dumps(normalize_compound_key(key)))
1355
1356        self.make_live()
1357        return False, None, None
1358    
1359    @check_request
1360    def _on_get_all_items(self, request: DbRequest) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]:
1361        env_id = request.env_id
1362        db_id = request.db_id
1363        coro_id = self.current_caller_coro_info.coro_id
1364        if env_id not in self.get_all_items_queue:
1365            self.get_all_items_queue[env_id] = list()
1366        
1367        self.get_all_items_queue[env_id].append((coro_id, db_id))
1368        self.make_live()
1369        return False, None, None
1370    
1371    @check_request
1372    def _on_put(self, request: DbRequest, key: KeyType, value: Union[ValueType, ValueInfo]) -> Tuple[bool, RawValueType, Optional[Exception]]:
1373        key = self.serializer.dumps(normalize_compound_key(key))
1374        db_id = request.db_id
1375        env_id = request.env_id
1376        
1377        exception = None
1378        result = None
1379        try:
1380            # self.put_queue
1381            if env_id not in self.put_queue:
1382                self.put_queue[env_id] = dict()
1383            
1384            env_put_queue = self.put_queue[env_id]
1385            if db_id not in env_put_queue:
1386                env_put_queue[db_id] = dict()
1387            
1388            env_put_queue_db = env_put_queue[db_id]
1389            if key not in env_put_queue_db:
1390                env_put_queue_db[key] = list()
1391            
1392            # self.data_cache
1393            if env_id not in self.data_cache:
1394                self.data_cache[env_id] = dict()
1395            
1396            env_data_cache = self.data_cache[env_id]
1397            if db_id not in env_data_cache:
1398                env_data_cache[db_id] = dict()
1399            
1400            env_data_cache_db = env_data_cache[db_id]
1401            if key not in env_data_cache_db:
1402                env_data_cache_db[key] = list()
1403            
1404            # both
1405            if isinstance(value, ValueInfo):
1406                result = RawValueInfo(self.serializer.dumps(value.value), value.dupdata, value.overwrite, value.append)
1407                if env_put_queue_db[key] and not value.dupdata:
1408                    env_data_cache_db[key][0] = env_put_queue_db[key][0] = result
1409                else:
1410                    env_put_queue_db[key].append(result)
1411                    env_data_cache_db[key].append(result)
1412            else:
1413                result = self.serializer.dumps(value)
1414                if env_put_queue_db[key]:
1415                    env_data_cache_db[key][0] = env_put_queue_db[key][0] = result
1416                else:
1417                    env_put_queue_db[key].append(result)
1418                    env_data_cache_db[key].append(result)
1419        except:
1420            exception = get_exception()
1421        
1422        self.make_live()
1423        return True, result, exception
1424    
1425    @check_request
1426    def _on_put_items(self, request: DbRequest, db_items: Dict[InputKeyType, Union[ValueType, ValueInfo]]) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]], Optional[Exception]]:
1427        result_items: Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]] = dict()
1428        db_id = request.db_id
1429        env_id = request.env_id
1430
1431        # self.put_queue
1432        if env_id not in self.put_queue:
1433            self.put_queue[env_id] = dict()
1434        
1435        env_put_queue = self.put_queue[env_id]
1436        if db_id not in result_items:
1437            result_items[db_id] = dict()
1438        
1439        result_db_items = result_items[db_id]
1440        
1441        if db_id not in env_put_queue:
1442            env_put_queue[db_id] = dict()
1443        
1444        env_put_queue_db = env_put_queue[db_id]
1445
1446        # self.data_cache
1447        if env_id not in self.data_cache:
1448            self.data_cache[env_id] = dict()
1449        
1450        env_data_cache = self.data_cache[env_id]
1451        if db_id not in result_items:
1452            result_items[db_id] = dict()
1453        
1454        result_db_items = result_items[db_id]
1455        
1456        if db_id not in env_data_cache:
1457            env_data_cache[db_id] = dict()
1458        
1459        env_data_cache_db = env_data_cache[db_id]
1460
1461        # both
1462        for key, value in db_items.items():
1463            key = self.serializer.dumps(normalize_compound_key(key))
1464            
1465            exception = None
1466            result = None
1467            try:
1468                if key not in env_put_queue_db:
1469                    env_put_queue_db[key] = list()
1470                
1471                if key not in env_data_cache_db:
1472                    env_data_cache_db[key] = list()
1473                
1474                if isinstance(value, ValueInfo):
1475                    result = env_data_cache_db[key][0] = env_put_queue_db[key][0] = RawValueInfo(self.serializer.dumps(value.value), value.dupdata, value.overwrite, value.append)
1476                    if env_data_cache_db[key] and not value.dupdata:
1477                        env_data_cache_db[key][0] = env_put_queue_db[key][0] = result
1478                    else:
1479                        env_put_queue_db[key].append(result)
1480                        env_data_cache_db[key].append(result)
1481                else:
1482                    result = self.serializer.dumps(value)
1483                    if env_data_cache_db[key]:
1484                        env_data_cache_db[key][0] = env_put_queue_db[key][0] = result
1485                    else:
1486                        env_put_queue_db[key].append(result)
1487                        env_data_cache_db[key].append(result)
1488            except:
1489                exception = get_exception()
1490            
1491            result_db_items[key] = (result, exception)
1492        
1493        self.make_live()
1494        return True, result_items, None
1495    
1496    @check_request
1497    def _on_delete(self, request: DbRequest, key: KeyType) -> Tuple[bool, None, Optional[Exception]]:
1498        key = self.serializer.dumps(normalize_compound_key(key))
1499        if key.startswith(EnvInfo.db_name_prefix_bytes):
1500            return True, None, KeyError(f'Can not delete special key (db info key): {key}')
1501        
1502        db_id = request.db_id
1503        env_id = request.env_id
1504        
1505        exception = None
1506        result = None
1507        try:
1508            if env_id not in self.deletion_cache:
1509                self.deletion_cache[env_id] = dict()
1510            
1511            env_deletion_cache = self.deletion_cache[env_id]
1512            if db_id not in env_deletion_cache:
1513                env_deletion_cache[db_id] = set()
1514            
1515            env_deletion_cache_db = env_deletion_cache[db_id]
1516            env_deletion_cache_db.add(key)
1517        except:
1518            exception = get_exception()
1519        
1520        self.make_live()
1521        return True, result, exception
1522    
1523    @check_request
1524    def _on_delete_kv(self, request: DbRequest, key: InputKeyType, value: ValueType) -> Tuple[bool, RawValueType, Optional[Exception]]:
1525        raw_key: bytes = self.serializer.dumps(normalize_compound_key(key))
1526        if raw_key.startswith(EnvInfo.db_name_prefix_bytes):
1527            return True, None, KeyError(f'Can not delete special raw_key (db info raw_key): {raw_key}')
1528        
1529        db_id = request.db_id
1530        env_id = request.env_id
1531        
1532        exception = None
1533        raw_value = None
1534        try:
1535            if env_id not in self.kv_deletion_cache:
1536                self.kv_deletion_cache[env_id] = dict()
1537            
1538            env_kv_deletion_cache = self.kv_deletion_cache[env_id]
1539            if db_id not in env_kv_deletion_cache:
1540                env_kv_deletion_cache[db_id] = dict()
1541            
1542            env_kv_deletion_cache_db = env_kv_deletion_cache[db_id]
1543            if raw_key not in env_kv_deletion_cache_db:
1544                env_kv_deletion_cache_db[raw_key] = list()
1545            
1546            raw_value = self.serializer.dumps(value)
1547            env_kv_deletion_cache_db[raw_key].append(raw_value)
1548        except:
1549            exception = get_exception()
1550        
1551        self.make_live()
1552        return True, (raw_key, raw_value), exception
1553    
1554    @check_request
1555    def _on_delete_items(self, request: DbRequest, db_items: Set[InputKeyType]) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Optional[Exception]]], Optional[Exception]]:
1556        result_items: Dict[InputKeyType, Tuple[RawKeyType, Optional[Exception]]] = dict()
1557        db_id = request.db_id
1558        env_id = request.env_id
1559
1560        if env_id not in self.deletion_cache:
1561            self.deletion_cache[env_id] = dict()
1562        
1563        env_deletion_cache = self.deletion_cache[env_id]
1564        if db_id not in env_deletion_cache:
1565            env_deletion_cache[db_id] = set()
1566        
1567        env_deletion_cache_db = env_deletion_cache[db_id]
1568        for key in db_items:
1569            exception = None
1570            try:
1571                raw_key: bytes = self.serializer.dumps(normalize_compound_key(key))
1572                if raw_key.startswith(EnvInfo.db_name_prefix_bytes):
1573                    exception = KeyError(f'Can not delete special key (db info key): {raw_key}')
1574                else:
1575                    env_deletion_cache_db.add(raw_key)
1576            except:
1577                exception = get_exception()
1578            
1579            result_items[key] = (raw_key, exception)
1580
1581        self.make_live()
1582        return True, result_items, None
1583    
1584    @check_request
1585    def _on_delete_kv_items(self, request: DbRequest, db_items: Dict[InputKeyType, Tuple[ValueType]]) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]], Optional[Exception]]:
1586        result_items: Dict[InputKeyType, Tuple[RawKeyType, RawValueType, Optional[Exception]]] = dict()
1587        db_id = request.db_id
1588        env_id = request.env_id
1589
1590        if env_id not in self.kv_deletion_cache:
1591            self.kv_deletion_cache[env_id] = dict()
1592        
1593        env_kv_deletion_cache = self.kv_deletion_cache[env_id]
1594        if db_id not in env_kv_deletion_cache:
1595            env_kv_deletion_cache[db_id] = dict()
1596        
1597        env_kv_deletion_cache_db = env_kv_deletion_cache[db_id]
1598        for key, value in db_items.items():
1599            exception = None
1600            raw_key = None
1601            raw_value = None
1602            try:
1603                raw_key: bytes = self.serializer.dumps(normalize_compound_key(key))
1604                if raw_key.startswith(EnvInfo.db_name_prefix_bytes):
1605                    exception = KeyError(f'Can not delete special key (db info key): {raw_key}')
1606                else:
1607                    if raw_key not in env_kv_deletion_cache_db:
1608                        env_kv_deletion_cache_db[raw_key] = list()
1609                    
1610                    raw_value = self.serializer.dumps(value)
1611                    env_kv_deletion_cache_db[raw_key].append(self.serializer.dumps(value))
1612            except:
1613                exception = get_exception()
1614            
1615            result_items[key] = (raw_key, raw_value, exception)
1616        
1617        self.make_live()
1618        return True, result_items, None
1619    
1620    def _on_open_databases(self, request: DbRequest, db_ids: Set[DbId], *args, **kwargs) -> ServiceProcessingResponse:
1621        try:
1622            env_info: EnvInfo = self.db_environments[request.env_id]
1623        except KeyError:
1624            exception = UnknownEnvError(request.env_id)
1625            return True, None, exception
1626        
1627        for db_id in db_ids:
1628            env_info.open_db(db_id, *args, **kwargs)
1629        
1630        env_info.env.sync(True)
1631        return True, None, None
1632    
1633    @check_request
1634    def _on_drop_db(self, request: DbRequest, delete: bool = False) -> ServiceProcessingResponse:
1635        coro_id = self.current_caller_coro_info.coro_id
1636        db_id = request.db_id
1637        env_id = request.env_id
1638
1639        if env_id not in self.drop_db_requests:
1640            self.drop_db_requests[env_id] = dict()
1641        
1642        drop_db_requests_env = self.drop_db_requests[env_id]
1643
1644        if db_id not in drop_db_requests_env:
1645            drop_db_requests_env[db_id] = (False, set())
1646        
1647        drop_db_requests_env_db: Tuple[bool, Set[CoroID]] = drop_db_requests_env[db_id]
1648
1649        if delete and (not drop_db_requests_env_db[0]):
1650            drop_db_requests_env_db = (True, drop_db_requests_env_db[1])
1651        
1652        drop_db_requests_env_db[1].add(coro_id)
1653        self.make_live()
1654        return False, None, None
1655    
1656    def sync_in_thread_pool(self, env_id: EnvId = None):
1657        async def sync_db_coro(i: Interface, self: 'Db', env_id: EnvId, asyncio_loop, need_to_ensure_asyncio_loop: bool):
1658            if need_to_ensure_asyncio_loop:
1659                asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None,CoroPriority.low, True))
1660            else:
1661                if asyncio_loop is None:
1662                    asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().get())
1663            
1664            async def sync_db(self: 'Db', asyncio_loop, env: lmdb.Environment):
1665                def sync_worker():
1666                    env.sync(True)
1667                    self.write_locked.discard(env_id)
1668                
1669                await task_in_thread_pool(asyncio_loop, sync_worker)
1670
1671            env: lmdb.Environment = self.db_environments[env_id].env
1672            await i(AsyncioLoop, AsyncioLoopRequest().wait(sync_db(self, asyncio_loop, env)))
1673            self.write_locked_coro_id.discard(i.coro_id)
1674            self.envs_in_sync.discard(env_id)
1675            def make_service_live_for_a_next_sync(self: 'Db'):
1676                self.make_live()
1677                self.wake_up_handle_registered = False
1678            
1679            if not self.wake_up_handle_registered:
1680                self.wake_up_handle_registered = True
1681                await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self)
1682
1683        asyncio_loop = None
1684        need_to_ensure_asyncio_loop = False
1685        try:
1686            asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get()
1687        except AsyncioLoopWasNotSetError:
1688            need_to_ensure_asyncio_loop = True
1689
1690        coro: CoroWrapperBase = put_root_from_other_service(self, sync_db_coro, self, env_id, asyncio_loop, need_to_ensure_asyncio_loop)
1691        coro.is_background_coro = True
1692        self.write_locked.add(env_id)
1693        self.write_locked_coro_id.add(coro.coro_id)
1694        self.envs_in_sync.add(env_id)
1695
1696
1697DbRequest.default_service_type = Db
1698
1699
1700def lmdb_reapplier(env_info: EnvInfo, handler: Callable, *args, **kwargs):
1701    environment: lmdb.Environment = env_info.env
1702    failed = True
1703    while failed:
1704        need_to_resize: bool = False
1705        try:
1706            handler(env_info, *args, **kwargs)
1707            failed = False
1708        except DBError as err:
1709            if isinstance(err.original_exception, lmdb.MapFullError):
1710                need_to_resize = True
1711        
1712        if need_to_resize:
1713            environment.set_mapsize(environment.info()['map_size'] + 2 * 1024**2)
default_env_path_and_params: cengal.code_flow_control.args_manager.versions.v_0.args_manager.ArgsKwargs = <cengal.code_flow_control.args_manager.versions.v_0.args_manager.ArgsKwargs object>
class Db(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin):
 503class Db(Service, EntityStatsMixin):
 504    def __init__(self, loop: CoroSchedulerType):
 505        super(Db, self).__init__(loop)
 506        self.default_env_name: str = '__default__.dbenv'
 507        self.default_envs_dir: str = 'db_envs'
 508        # self.drop_db_requests: Dict[CoroID, Tuple[Hashable, bool]] = dict()
 509        # self.drop_db_requests: Dict[EnvId, Dict[CoroID, Tuple[DbId, bool]]] = dict()
 510        self.drop_db_requests: Dict[EnvId, Dict[DbId, Tuple[bool, Set[CoroID]]]] = dict()
 511        # self.read_queue: List[Tuple[CoroID, Tuple[RawKeyType, DbId]]] = list()
 512        self.read_queue: Dict[EnvId, Dict[DbId, Dict[RawKeyType, Set[CoroID]]]] = dict()
 513        self.massive_read_queue: Dict[EnvId, Dict[CoroID, Dict[DbId, List[RawKeyType]]]] = dict()
 514        # self.massive_read_queue: List[Tuple[CoroID, Set[Tuple[KeyType, DbId]]]] = list()
 515        self.put_queue: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]] = dict()
 516        self.data_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]] = dict()
 517        # self.data_cache: Dict[EnvId, Dict[Tuple[RawKeyType, DbId], RawValueType]] = dict()
 518        self.max_data_cache_size: Union[None, int] = 10000
 519        self.deletion_cache: Dict[EnvId, Dict[DbId, Set[RawKeyType]]] = dict()
 520        self.kv_deletion_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[RawValueType]]]] = dict()
 521        # self.kv_deletion_cache: Dict[Tuple[Hashable, Hashable], Any] = dict()
 522        self.get_first_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict()
 523        self.get_last_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict()
 524        self.get_n_items_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, int, bool]]] = dict()
 525        self.get_items_range_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, RawKeyType, int, bool]]] = dict()
 526        # self.get_all_items_queue: List[Tuple[CoroID, DbId, EnvId]] = list()
 527        self.get_all_items_queue: Dict[EnvId, List[Tuple[CoroID, DbId]]] = dict()
 528        self.open_db_environment_requests: Dict[CoroID, Tuple[EnvId, Union[None, str], bool, Tuple, Dict]] = dict()
 529        self.root_path_to_db_environments_rel: RelativePath = None
 530        self.app_name_waiter: CoroWrapperBase = None
 531        self.default_db_environment: lmdb.Environment = None
 532        self.db_environments: Dict[EnvId, EnvInfo] = dict()
 533        # self.databases: Dict[Hashable, Any] = dict()
 534        # self.db_names: Dict[DbId, DbName] = dict()
 535        self.async_loop = None
 536        self.sync_time_interval = 1.0
 537        self.last_sync_time = perf_counter()
 538        self.envs_need_to_be_sync: Set[DbId] = set()
 539        self.envs_in_sync: Set[DbId] = set()
 540        self.force_sync: Set[EnvId] = set()
 541        self.sync_an_each_write: bool = False
 542        self.write_locked: Set[EnvId] = set()
 543        self.writes_num: int = 0
 544        self.reads_num: int = 0
 545        self.deletes_num: int = 0
 546        self.db_drops_num: int = 0
 547        self.write_locked_coro_id: Set[CoroID] = set()
 548        self.wake_up_handle_registered: bool = False
 549        # self.serializer = best_serializer_for_standard_data((
 550        #                                     DataFormats.binary,
 551        #                                     DataFormats.messagepack,
 552        #                                     Tags.can_use_bytes,
 553        #                                     Tags.decode_str_as_str,
 554        #                                     Tags.decode_list_as_list,
 555        #                                     Tags.decode_bytes_as_bytes,
 556        #                                     Tags.superficial,
 557        #                                     Tags.current_platform,
 558        #                                     Tags.multi_platform,
 559        #                                 ),
 560        #                                 TestDataType.small,
 561        #                                 0.1)
 562        self.serializer = Serializer(Serializers.msgspec_messagepack)
 563        self._request_workers = {
 564            0: self._on_set_root_path_to_db_environments,
 565            1: self._on_open_databases,
 566            2: self._on_drop_db,
 567            3: self._on_sync,
 568            4: self._on_get,
 569            5: self._on_get_items,
 570            6: self._on_get_all_items,
 571            7: self._on_put,
 572            8: self._on_put_items,
 573            9: self._on_delete,
 574            10: self._on_delete_kv,
 575            11: self._on_delete_items,
 576            12: self._on_delete_kv_items,
 577            13: self._on_open_db_environment,
 578            14: self._on_get_first,
 579            15: self._on_get_last,
 580            16: self._on_get_n_items,
 581            17: self._on_get_items_range,
 582            18: self._on_close_db_environment,
 583        }
 584
 585    # TODO: sync with last implementation
 586    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
 587        return type(self).__name__, {
 588            'db_env_ids': list(self.db_environments.keys()),
 589            'writes num': self.writes_num,
 590            'reads num': self.reads_num,
 591            'deletes num': self.deletes_num,
 592            'db drops num': self.db_drops_num,
 593        }
 594    
 595    def norm_key(self, key: InputKeyType) -> NormalizedKeyType:
 596        return normalize_compound_key(key)
 597
 598    def raw_key(self, env_or_id: Union[lmdb.Environment, EnvId], key: InputKeyType) -> RawKeyType:
 599        raw_key: bytes = self.serializer.dumps(normalize_compound_key(key))
 600        if isinstance(env_or_id, lmdb.Environment):
 601            env = env_or_id
 602        else:
 603            env = self.db_environments[env_or_id].env
 604        
 605        if len(raw_key) > env.max_key_size():
 606            raise RawKeyIsTooLargeError(f'Raw form ({raw_key=}) of the key ({key=}) is too large: {len(raw_key)} > {env.max_key_size()}')
 607
 608    def destroy(self):
 609        # TODO: we need to use some loop destroy service in order to put our coro which will write all pending queues,
 610        # sync envirounments and close them. Also we need to prevent new requests from being processed.
 611        db_environments_values = self.db_environments.values()
 612        self.db_environments = type(self.db_environments)()
 613        self.default_db_environment = None
 614        for env_info in db_environments_values:
 615            env_info.close()
 616
 617    def single_task_registration_or_immediate_processing(
 618            self, *args, **kwargs) -> ServiceProcessingResponse:
 619        result = self.try_resolve_request(*args, **kwargs)
 620        if result is None:
 621            self._ensure_default_db_environment()
 622            return True, None, None
 623        else:
 624            return result
 625
 626    def _ensure_default_db_environment(self) -> bool:
 627        if self.default_db_environment is None:
 628            if self.root_path_to_db_environments_rel is None:
 629                if self.app_name_waiter is None:
 630                    async def coro(i: Interface, self: 'Db'):
 631                        app_name_for_fs = await i(InstanceRequest().wait('app_name_for_fs'))
 632                        app_data_dir_path_type: AppDirectoryType = await i(InstanceRequest().wait('app_data_dir_path_type'))
 633                        app_dir_path: AppDirPath = await i(InstanceRequest().wait(AppDirPath))
 634                        app_data_dir_path: str = app_dir_path.cached(app_data_dir_path_type, app_name_for_fs)
 635                        self.root_path_to_db_environments_rel = RelativePath(RelativePath(app_data_dir_path)(self.default_envs_dir))
 636                        self._init_default_db_env()
 637                        self.app_name_waiter = None
 638                        self.make_live()
 639                    
 640                    self.app_name_waiter = put_root_from_other_service(self, coro, self)
 641                    # self.app_name_waiter.is_background_coro = True
 642                
 643                self.make_dead()
 644                return False
 645            else:
 646                self._init_default_db_env()
 647                return True
 648        else:
 649            return True
 650
 651    def full_processing_iteration(self):
 652        # TODO: combine all queues into one single transaction by env_id. Or at most two transactions: read and write.
 653        # This will improve performance and will give ability to move transations to other threads or even processess if needed.
 654
 655        # TODO: since DB can not handle big number of transactions per secons, it is better to combine all requests and process 
 656        # them at most as once a milisecond (it is frequently engough).
 657
 658        if not self._ensure_default_db_environment():
 659            return
 660        
 661        if self.force_sync:
 662            self.envs_need_to_be_sync |= self.force_sync
 663            self.force_sync = set()
 664        
 665        put_queue_buff = self.put_queue
 666        self.put_queue = type(put_queue_buff)()
 667        
 668        data_cache_buff = self.data_cache  # will be cleared at the end of the iteration if necessary
 669        
 670        read_queue_buff = self.read_queue
 671        self.read_queue = type(read_queue_buff)()
 672        
 673        massive_read_queue_buff = self.massive_read_queue
 674        self.massive_read_queue = type(massive_read_queue_buff)()
 675        
 676        deletion_cache_buff = self.deletion_cache
 677        self.deletion_cache = type(deletion_cache_buff)()
 678        
 679        kv_deletion_cache_buff = self.kv_deletion_cache
 680        self.kv_deletion_cache = type(kv_deletion_cache_buff)()
 681        
 682        get_all_items_queue_buff = self.get_all_items_queue
 683        self.get_all_items_queue = type(get_all_items_queue_buff)()
 684
 685        get_first_queue_buff = self.get_first_queue
 686        self.get_first_queue = type(get_first_queue_buff)()
 687
 688        get_last_queue_buff = self.get_last_queue
 689        self.get_last_queue = type(get_last_queue_buff)()
 690
 691        get_n_items_queue_buff = self.get_n_items_queue
 692        self.get_n_items_queue = type(get_n_items_queue_buff)()
 693
 694        get_items_range_queue_buff = self.get_items_range_queue
 695        self.get_items_range_queue = type(get_items_range_queue_buff)()
 696
 697        open_db_environment_requests_buff: Dict[CoroID, Tuple[EnvId, Union[None, str], bool, Tuple, Dict]] = self.open_db_environment_requests
 698        self.open_db_environment_requests = type(open_db_environment_requests_buff)()
 699
 700        # open_db_environment
 701        for coro_id, request_info in open_db_environment_requests_buff.items():
 702            env_id, env_path, can_be_written_externally, args, kwargs = request_info
 703            if env_id in self.db_environments:
 704                self.register_response(coro_id, self.db_environments[env_id])
 705            else:
 706                exception = None
 707                try:
 708                    result = self._init_db_env(env_id, env_path, can_be_written_externally, *args, **kwargs)
 709                except:
 710                    exception = get_exception()
 711
 712                self.register_response(coro_id, result, exception)
 713
 714        # put
 715        def put_handler(env_info: EnvInfo, put_info: Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]):
 716            try:
 717                with env_info.env.begin(write=True) as txn:
 718                    for db_id, db_put_info in put_info.items():
 719                        if db_id in env_info.databases:
 720                            for raw_key, values in db_put_info.items():
 721                                for value in values:
 722                                    if isinstance(value, RawValueInfo):
 723                                        value, dupdata, overwrite, append = value
 724                                    else:
 725                                        dupdata, overwrite, append = True, True, False
 726                                    
 727                                    txn.put(raw_key, value, db=env_info.databases[db_id], dupdata=dupdata, overwrite=overwrite, append=append)
 728                        
 729                        self.writes_num += len(db_put_info)
 730            except lmdb.MapFullError:
 731                raise DBError.from_exception(db_id)
 732        
 733        for env_id, put_info in put_queue_buff.items():
 734            if env_id in self.db_environments:
 735                self.envs_need_to_be_sync.add(env_id)
 736                lmdb_reapplier(self.db_environments[env_id], put_handler, put_info)
 737        
 738        # TODO: implement replace* methods processing
 739
 740        # delete
 741        for env_id, deletion_cache_buff_db_info in deletion_cache_buff.items():
 742            if env_id in self.db_environments:
 743                env_info = self.db_environments[env_id]
 744                self.envs_need_to_be_sync.add(env_id)
 745                with env_info.env.begin(write=True) as txn:
 746                    for db_id, del_keys in deletion_cache_buff_db_info.items():
 747                        if db_id in env_info.databases:
 748                            for del_raw_key in del_keys:
 749                                txn.delete(del_raw_key, db=env_info.databases[db_id])
 750                                self.deletes_num += 1
 751                                try:
 752                                    data_cache_buff[env_id][db_id].pop(del_raw_key, None)
 753                                except KeyError:
 754                                    pass
 755
 756        # delete_kv
 757        for env_id, kv_deletion_cache_buff_db_info in kv_deletion_cache_buff.items():
 758            if env_id in self.db_environments:
 759                env_info = self.db_environments[env_id]
 760                self.envs_need_to_be_sync.add(env_id)
 761                with env_info.env.begin(write=True) as txn:
 762                    for db_id, del_keys in kv_deletion_cache_buff_db_info.items():
 763                        if db_id in env_info.databases:
 764                            for del_raw_key, del_raw_values in del_keys.items():
 765                                for del_raw_value in del_raw_values:
 766                                    txn.delete(del_raw_key, del_raw_value, db=env_info.databases[db_id])
 767                                    self.deletes_num += 1
 768                                    try:
 769                                        raw_values: List[Union[RawValueType, RawValueInfo]] = data_cache_buff[env_id][db_id][del_raw_key]
 770                                        new_raw_values: List[Union[RawValueType, RawValueInfo]] = list()
 771                                        for raw_value in raw_values:
 772                                            raw_value_original = raw_value
 773                                            if isinstance(raw_value, RawValueInfo):
 774                                                raw_value, _, _, _ = raw_value
 775                                            
 776                                            if raw_value != del_raw_value:
 777                                                new_raw_values.append(raw_value_original)
 778                                            
 779                                        if new_raw_values:
 780                                            data_cache_buff[env_id][db_id][del_raw_key] = new_raw_values
 781                                        else:
 782                                            data_cache_buff[env_id][db_id].pop(del_raw_key, None)
 783                                    except KeyError:
 784                                        pass
 785
 786        # drop
 787        drop_db_requests_buff = self.drop_db_requests
 788        self.drop_db_requests = type(drop_db_requests_buff)()
 789        
 790        def drop_handler(env_info: EnvInfo, drop_info: Dict[DbId, Tuple[bool, Set[CoroID]]]):
 791            for db_id, db_drop_info in drop_info.items():
 792                delete_db, coro_id = db_drop_info
 793                if db_id in env_info.databases:
 794                    try:
 795                        with env_info.env.begin(write=True) as txn:
 796                            txn.drop(db=env_info.databases[db_id], delete=delete_db)
 797                            if delete_db:
 798                                del env_info.databases[db_id]
 799                                del env_info.db_names[db_id]
 800                        
 801                        self.db_drops_num += 1
 802                    except lmdb.MapFullError:
 803                        raise DBError.from_exception(db_id)
 804                    
 805                    self.register_response(coro_id, None, UnknownEnvError(env_id))
 806                else:
 807                    self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 808        
 809        for env_id, drop_info in drop_db_requests_buff.items():
 810            if env_id in self.db_environments:
 811                self.envs_need_to_be_sync.add(env_id)
 812                lmdb_reapplier(self.db_environments[env_id], drop_handler, drop_info)
 813            else:
 814                for db_id, db_drop_info in drop_info.items():
 815                    delete_db, coro_id = db_drop_info
 816                    self.register_response(coro_id, None, UnknownEnvError(env_id))
 817
 818        # get
 819        def get_item(txn, key_info: Tuple[RawKeyType, DbId, EnvId], data_cache_buff: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]]) -> Tuple[ValueType, Optional[Exception]]:
 820            key, db_id, env_id = key_info
 821            need_to_get_from_db = True
 822            try:
 823                values = data_cache_buff[env_id][db_id][key]
 824                if values:
 825                    value = values[0]
 826                    if isinstance(value, RawValueInfo):
 827                        value, _, _, _ = value
 828
 829                    need_to_get_from_db = False
 830            except KeyError:
 831                pass
 832            
 833            if need_to_get_from_db:
 834                value = txn.get(key, db=self.db_environments[env_id].databases[db_id])
 835                self.reads_num += 1
 836            
 837            exception = None
 838            try:
 839                if value is None:
 840                    exception = DbKeyError(key_info)
 841                else:
 842                    value = self.serializer.loads(value)
 843            except:
 844                exception = get_exception()
 845            
 846            return value, exception
 847        
 848        # _on_get
 849        for env_id, read_queue_buff_db_info in read_queue_buff.items():
 850            if env_id in self.db_environments:
 851                env_info = self.db_environments[env_id]
 852                for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items():
 853                    if db_id in env_info.databases:
 854                        with env_info.env.begin() as txn:
 855                            for raw_key, coro_ids in read_queue_buff_db_key_info.items():
 856                                value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff)
 857                                for coro_id in coro_ids:
 858                                    self.register_response(coro_id, value, exception)
 859                    else:
 860                        for raw_key, coro_ids in read_queue_buff_db_key_info.items():
 861                            for coro_id in coro_ids:
 862                                self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 863            else:
 864                for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items():
 865                    for raw_key, coro_ids in read_queue_buff_db_key_info.items():
 866                        for coro_id in coro_ids:
 867                            self.register_response(coro_id, None, UnknownEnvError(env_id))
 868        
 869        # _on_get_items
 870        results: Dict[CoroID, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]]] = dict()
 871        for env_id, massive_read_queue_buff_coro_info in massive_read_queue_buff.items():
 872            if env_id in self.db_environments:
 873                env_info = self.db_environments[env_id]
 874                for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items():
 875                    if coro_id not in results:
 876                        results[coro_id] = dict()
 877                    
 878                    coro_results = results[coro_id]
 879                    for db_id, raw_keys in read_queue_buff_db_info.items():
 880                        if db_id not in coro_results:
 881                            coro_results[db_id] = OrderedDict()
 882                        
 883                        coro_db_results = coro_results[db_id]
 884                        if db_id in env_info.databases:
 885                            with env_info.env.begin() as txn:
 886                                for raw_key in raw_keys:
 887                                    value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff)
 888                                    coro_db_results[normalize_compound_key(raw_key)] = (value, exception)
 889                        else:
 890                            for coro_id in coro_ids:
 891                                self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 892            else:
 893                for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items():
 894                    for db_id, raw_keys in read_queue_buff_db_info.items():
 895                        for coro_id in coro_ids:
 896                            self.register_response(coro_id, None, UnknownEnvError(env_id))
 897        
 898        for coro_id, coro_results in results.items():
 899            if coro_results:
 900                db_id = tuple(coro_results.keys())[0]
 901                self.register_response(coro_id, coro_results[db_id], None)
 902            else:
 903                self.register_response(coro_id, OrderedDict(), None)
 904
 905        # get all items
 906        for env_id, requests_info in get_all_items_queue_buff.items():
 907            for request_info in requests_info:
 908                coro_id, db_id = request_info
 909                if env_id in self.db_environments:
 910                    env_info = self.db_environments[env_id]
 911                    env = env_info.env
 912                    if db_id in env_info.databases:
 913                        db = env_info.databases[db_id]
 914                        with env.begin(db=db) as txn:
 915                            result = list()
 916                            exception = None
 917                            try:
 918                                result = [(normalize_compound_key(self.serializer.loads(k)), self.serializer.loads(v)) for k, v in txn.cursor() if not k.startswith(EnvInfo.db_name_prefix_bytes)]
 919                                self.reads_num += len(result)
 920                            except:
 921                                exception = get_exception()
 922                            
 923                            self.register_response(coro_id, result, exception)
 924                    else:
 925                        self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 926                else:
 927                    self.register_response(coro_id, None, UnknownEnvError(env_id))
 928        
 929        # get_first
 930        for env_id, get_first_queue_buff_db_info in get_first_queue_buff.items():
 931            if env_id in self.db_environments:
 932                env_info = self.db_environments[env_id]
 933                for db_id, coro_ids in get_first_queue_buff_db_info.items():
 934                    if db_id in env_info.databases:
 935                        db = env_info.databases[db_id]
 936                        with env_info.env.begin(db=db) as txn:
 937                            result = None
 938                            exception = None
 939                            try:
 940                                cursor: lmdb.Cursor = txn.cursor()
 941                                if cursor.first():
 942                                    key = cursor.key()
 943                                    key_found: bool = True
 944                                    while key.startswith(EnvInfo.db_name_prefix_bytes):
 945                                        key_found = cursor.next_nodup()
 946                                        if not key_found:
 947                                            break
 948
 949                                        key = cursor.key()
 950
 951                                    if key_found:
 952                                        value = cursor.value()
 953                                        result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value()))
 954                                        self.reads_num += 1
 955                                    else:
 956                                        exception = KeyError()
 957                                else:
 958                                    exception = KeyError()
 959                            except:
 960                                exception = get_exception()
 961                            
 962                            for coro_id in coro_ids:
 963                                self.register_response(coro_id, result, exception)
 964                    else:
 965                        for coro_id in coro_ids:
 966                            self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 967            else:
 968                for db_id, coro_ids in get_first_queue_buff_db_info.items():
 969                    for coro_id in coro_ids:
 970                        self.register_response(coro_id, None, UnknownEnvError(env_id))
 971        
 972        # get_last
 973        for env_id, get_last_queue_buff_db_info in get_last_queue_buff.items():
 974            if env_id in self.db_environments:
 975                env_info = self.db_environments[env_id]
 976                for db_id, coro_ids in get_last_queue_buff_db_info.items():
 977                    if db_id in env_info.databases:
 978                        db = env_info.databases[db_id]
 979                        with env_info.env.begin(db=db) as txn:
 980                            result = None
 981                            exception = None
 982                            try:
 983                                cursor: lmdb.Cursor = txn.cursor()
 984                                if cursor.last():
 985                                    key: bytes = cursor.key()
 986                                    key_found: bool = True
 987                                    key = cursor.key()
 988                                    while key.startswith(EnvInfo.db_name_prefix_bytes):
 989                                        key_found = cursor.prev_nodup()
 990                                        if not key_found:
 991                                            break
 992
 993                                        key = cursor.key()
 994
 995                                    if key_found:
 996                                        value: bytes = cursor.value()
 997                                        result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value()))
 998                                        self.reads_num += 1
 999                                    else:
1000                                        exception = KeyError()
1001                                else:
1002                                    exception = KeyError()
1003                            except:
1004                                exception = get_exception()
1005                            
1006                            for coro_id in coro_ids:
1007                                self.register_response(coro_id, result, exception)
1008                    else:
1009                        for coro_id in coro_ids:
1010                            self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
1011            else:
1012                for db_id, coro_ids in get_last_queue_buff_db_info.items():
1013                    for coro_id in coro_ids:
1014                        self.register_response(coro_id, None, UnknownEnvError(env_id))
1015        
1016        # get_n_items
1017        for env_id, get_n_items_queue_buff_coro_info in get_n_items_queue_buff.items():
1018            if env_id in self.db_environments:
1019                env_info = self.db_environments[env_id]
1020                for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items():
1021                    coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list()
1022                    db_id, first_desired_raw_key, num_items, reverse = read_queue_buff_db_info
1023                    if db_id in env_info.databases:
1024                        db = env_info.databases[db_id]
1025                        with env_info.env.begin(db=db) as txn:
1026                            exception = None
1027                            try:
1028                                cursor: lmdb.Cursor = txn.cursor()
1029                                if cursor.set_range(first_desired_raw_key):
1030                                    if reverse:
1031                                        cursor_iterator = cursor.iterprev(keys=True, values=True)
1032                                    else:
1033                                        cursor_iterator = cursor.iternext(keys=True, values=True)
1034                                    
1035                                    for raw_key, raw_value in cursor_iterator:
1036                                        if raw_key.startswith(EnvInfo.db_name_prefix_bytes):
1037                                            continue
1038
1039                                        if (num_items is not None) and (num_items <= 0):
1040                                            break
1041                                        
1042                                        coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value)))
1043                                        self.reads_num += 1
1044                                        if num_items is not None:
1045                                            num_items -= 1
1046                                else:
1047                                    exception = KeyError()
1048                            except:
1049                                exception = get_exception()
1050                            
1051                            self.register_response(coro_id, coro_results, exception)
1052                    else:
1053                        self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
1054            else:
1055                for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items():
1056                    for coro_id in coro_ids:
1057                        self.register_response(coro_id, None, UnknownEnvError(env_id))
1058        
1059        # get_items_range
1060        for env_id, get_items_range_queue_buff_coro_info in get_items_range_queue_buff.items():
1061            if env_id in self.db_environments:
1062                env_info = self.db_environments[env_id]
1063                for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items():
1064                    coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list()
1065                    db_id, first_desired_raw_key, last_desired_raw_key, num_items, reverse = read_queue_buff_db_info
1066                    if db_id in env_info.databases:
1067                        db = env_info.databases[db_id]
1068                        with env_info.env.begin(db=db) as txn:
1069                            exception = None
1070                            try:
1071                                cursor: lmdb.Cursor = txn.cursor()
1072                                if cursor.set_range(first_desired_raw_key):
1073                                    if reverse:
1074                                        cursor_iterator = cursor.iterprev(keys=True, values=True)
1075                                    else:
1076                                        cursor_iterator = cursor.iternext(keys=True, values=True)
1077                                    
1078                                    for raw_key, raw_value in cursor_iterator:
1079                                        if raw_key.startswith(EnvInfo.db_name_prefix_bytes):
1080                                            continue
1081                                        
1082                                        if reverse:
1083                                            if raw_key < last_desired_raw_key:
1084                                                break
1085                                        else:
1086                                            if raw_key > last_desired_raw_key:
1087                                                break
1088
1089                                        if (num_items is not None) and (num_items <= 0):
1090                                            break
1091
1092                                        coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value)))
1093                                        self.reads_num += 1
1094                                        if num_items is not None:
1095                                            num_items -= 1
1096                                else:
1097                                    exception = KeyError()
1098                            except:
1099                                exception = get_exception()
1100                            
1101                            self.register_response(coro_id, coro_results, exception)
1102                    else:
1103                        self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
1104            else:
1105                for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items():
1106                    for coro_id in coro_ids:
1107                        self.register_response(coro_id, None, UnknownEnvError(env_id))
1108        
1109        need_to_sync = self.sync_an_each_write
1110
1111        # periodic sync
1112        if (perf_counter() - self.last_sync_time) >= self.sync_time_interval:
1113            for env_id, env_info in self.db_environments.items():
1114                self.envs_need_to_be_sync.add(env_id)
1115                need_to_sync = True
1116
1117        # sync
1118        if need_to_sync and self.envs_need_to_be_sync:
1119            envs_need_to_be_sync_bak = self.envs_need_to_be_sync - self.envs_in_sync
1120            self.envs_need_to_be_sync = set(self.envs_in_sync)
1121            for env_id in envs_need_to_be_sync_bak:
1122                self.sync_in_thread_pool(env_id)
1123            
1124            self.last_sync_time = perf_counter()
1125        
1126        # invalidate data_cache for envs that can be written externally
1127        for env_id, env_info in self.db_environments.items():
1128            if env_info.init_info.can_be_written_externally:
1129                self.data_cache.pop(env_id, None)
1130        
1131        # clear too big caches
1132        for env_id, env_info in self.db_environments.items():
1133            if env_id in self.data_cache:
1134                if len(self.data_cache[env_id]) > self.max_data_cache_size:
1135                    del self.data_cache[env_id]
1136
1137        self.make_dead()
1138
1139    def in_work(self) -> bool:
1140        result: bool = bool(self.default_db_environment is None) \
1141                            or bool(self.open_db_environment_requests) \
1142                            or bool(self.get_first_queue) \
1143                            or bool(self.get_last_queue) \
1144                            or bool(self.get_n_items_queue) \
1145                            or bool(self.get_items_range_queue) \
1146                            or bool(self.read_queue) \
1147                            or bool(self.massive_read_queue) \
1148                            or bool(self.get_all_items_queue)\
1149                            or bool(self.force_sync) \
1150                            or bool(self.deletion_cache) \
1151                            or bool(self.kv_deletion_cache) \
1152                            or bool(self.drop_db_requests) \
1153                            or bool(self.put_queue) \
1154                            or bool(self.envs_need_to_be_sync) \
1155                            or ((perf_counter() - self.last_sync_time) >= self.sync_time_interval)
1156        return self.thrifty_in_work(result)
1157    
1158    def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]:
1159        time_since_last_sync_time: float = perf_counter() - self.last_sync_time
1160        if self.sync_time_interval > time_since_last_sync_time:
1161            return True, self.sync_time_interval - time_since_last_sync_time
1162        else:
1163            return True, 0
1164
1165    def _init_db_env(self, env_id: EnvId, env_path: Union[None, str], can_be_written_externally: bool, *args, **kwargs) -> EnvInfo:
1166        if env_id in self.db_environments:
1167            return self.db_environments[env_id]
1168
1169        if env_path is None:
1170            if env_id is None:
1171                env_path = self.root_path_to_db_environments_rel(self.default_env_name)
1172            else:
1173                env_path = self.root_path_to_db_environments_rel(to_identifier(f'{env_id}'))
1174        else:
1175            if os.path.isabs(env_path):
1176                env_path = os.path.normpath(env_path)
1177            else:
1178                env_path = self.root_path_to_db_environments_rel(env_path)
1179
1180        env_init_info: EnvInitInfo = EnvInitInfo(
1181            env_id, 
1182            env_path, 
1183            can_be_written_externally, 
1184            *args, **kwargs
1185        )
1186        env_info: EnvInfo = EnvInfo(env_init_info)
1187        self.db_environments[env_info.env_id] = env_info
1188        env_info.env.sync(True)
1189        return env_info
1190
1191    def _init_default_db_env(self):
1192        args, kwargs = default_env_path_and_params()
1193        env_info: EnvInfo = self._init_db_env(None, *args, **kwargs)
1194        self.default_db_environment = env_info
1195
1196    def _on_open_db_environment(self, request: DbRequest, env_id: EnvId, env_path: Union[None, str], can_be_written_externally: bool, *args, **kwargs) -> Tuple[bool, EnvInfo, Exception]:
1197        self.open_db_environment_requests[self.current_caller_coro_info.coro_id] = (env_id, env_path, can_be_written_externally, args, kwargs)
1198        self.make_live()
1199        return False, None, None
1200    
1201    @check_request
1202    def _on_close_db_environment(self, request: DbRequest):
1203        env_id: EnvId = request.env_id
1204        
1205        if env_id is None:
1206            return True, False, DefaultDbEnvironmentCanNotBeClosedManualyError()
1207        
1208        if env_id in self.db_environments:
1209            self.db_environments[env_id].close()
1210            del self.db_environments[env_id]
1211            return True, True, None
1212        else:
1213            return True, False, UnknownEnvError(env_id)
1214
1215    def _on_set_root_path_to_db_environments(self, request: DbRequest, root_path_to_db_environments: str) -> ServiceProcessingResponse:
1216        self.root_path_to_db_environments_rel = RelativePath(root_path_to_db_environments)
1217        return True, True, None
1218    
1219    @check_request
1220    def _on_sync(self, request: DbRequest) -> ServiceProcessingResponse:
1221        if self.put_queue:
1222            self.force_sync.add(request.env_id)
1223            self.make_live()
1224        else:
1225            # self.default_db_environment.sync(True)
1226            self.sync_in_thread_pool(request.env_id)
1227        
1228        return True, None, None
1229    
1230    @check_request
1231    def _on_get(self, request: DbRequest, key: KeyType) -> ServiceProcessingResponse:
1232        coro_id = self.current_caller_coro_info.coro_id
1233        key = self.serializer.dumps(normalize_compound_key(key))
1234        db_id = request.db_id
1235        env_id = request.env_id
1236        
1237        if env_id in self.data_cache:
1238            env_cache = self.data_cache[env_id]
1239            if db_id in env_cache:
1240                db_cache = env_cache[db_id]
1241                if key in db_cache:
1242                    values = db_cache[key]
1243                    if values:
1244                        value = values[0]
1245                        if isinstance(value, RawValueInfo):
1246                            value, _, _, _ = value
1247                        
1248                        value = self.serializer.loads(value)
1249                        return True, value, None
1250        
1251        if env_id not in self.read_queue:
1252            self.read_queue[env_id] = dict()
1253        
1254        read_queue_env = self.read_queue[env_id]
1255        if db_id not in read_queue_env:
1256            read_queue_env[db_id] = dict()
1257        
1258        read_queue_env_db = read_queue_env[db_id]
1259        if key not in read_queue_env_db:
1260            read_queue_env_db[key] = set()
1261        
1262        read_queue_env_db_key = read_queue_env_db[key]
1263        read_queue_env_db_key.add(coro_id)
1264        self.make_live()
1265        return False, None, None
1266    
1267    @check_request
1268    def _on_get_first(self, request: DbRequest) -> ServiceProcessingResponse:
1269        coro_id = self.current_caller_coro_info.coro_id
1270        db_id = request.db_id
1271        env_id = request.env_id
1272        
1273        if env_id not in self.get_first_queue:
1274            self.get_first_queue[env_id] = dict()
1275        
1276        get_first_queue_env = self.get_first_queue[env_id]
1277        if db_id not in get_first_queue_env:
1278            get_first_queue_env[db_id] = set()
1279        
1280        get_first_queue_env_db = get_first_queue_env[db_id]
1281        get_first_queue_env_db.add(coro_id)
1282        
1283        self.make_live()
1284        return False, None, None
1285    
1286    @check_request
1287    def _on_get_last(self, request: DbRequest) -> ServiceProcessingResponse:
1288        coro_id = self.current_caller_coro_info.coro_id
1289        db_id = request.db_id
1290        env_id = request.env_id
1291        
1292        if env_id not in self.get_last_queue:
1293            self.get_last_queue[env_id] = dict()
1294        
1295        get_last_queue_env = self.get_last_queue[env_id]
1296        if db_id not in get_last_queue_env:
1297            get_last_queue_env[db_id] = set()
1298        
1299        get_last_queue_env_db = get_last_queue_env[db_id]
1300        get_last_queue_env_db.add(coro_id)
1301        
1302        self.make_live()
1303        return False, None, None
1304    
1305    @check_request
1306    def _on_get_n_items(self, request: DbRequest, desired_key: InputKeyType, num: Optional[int] = None, reverse: bool = False) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]:
1307        coro_id = self.current_caller_coro_info.coro_id
1308        desired_key = self.serializer.dumps(normalize_compound_key(desired_key))
1309        db_id = request.db_id
1310        env_id = request.env_id
1311
1312        if env_id not in self.get_n_items_queue:
1313            self.get_n_items_queue[env_id] = dict()
1314        
1315        get_n_items_queue_env = self.get_n_items_queue[env_id]
1316        get_n_items_queue_env[coro_id] = (db_id, desired_key, num, reverse)
1317        self.make_live()
1318        return False, None, None
1319    
1320    @check_request
1321    def _on_get_items_range(self, request: DbRequest, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None, reverse: bool = False) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]:
1322        coro_id = self.current_caller_coro_info.coro_id
1323        first_desired_key = self.serializer.dumps(normalize_compound_key(first_desired_key))
1324        last_desired_key = self.serializer.dumps(normalize_compound_key(last_desired_key))
1325        db_id = request.db_id
1326        env_id = request.env_id
1327
1328        if env_id not in self.get_items_range_queue:
1329            self.get_items_range_queue[env_id] = dict()
1330        
1331        get_items_range_queue_env = self.get_items_range_queue[env_id]
1332        get_items_range_queue_env[coro_id] = (db_id, first_desired_key, last_desired_key, num, reverse)
1333        self.make_live()
1334        return False, None, None
1335    
1336    @check_request
1337    def _on_get_items(self, request: DbRequest, db_keys: Sequence[InputKeyType]) -> Tuple[bool, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]], Optional[Exception]]:
1338        coro_id = self.current_caller_coro_info.coro_id
1339        db_id = request.db_id
1340        env_id = request.env_id
1341
1342        if env_id not in self.massive_read_queue:
1343            self.massive_read_queue[env_id] = dict()
1344        
1345        massive_read_queue_env = self.massive_read_queue[env_id]
1346        if coro_id not in massive_read_queue_env:
1347            massive_read_queue_env[coro_id] = dict()
1348        
1349        massive_read_queue_env_coro = massive_read_queue_env[coro_id]
1350        if db_id not in massive_read_queue_env_coro:
1351            massive_read_queue_env_coro[db_id] = list()
1352        
1353        massive_read_queue_env_coro_db = massive_read_queue_env_coro[db_id]
1354        for key in db_keys:
1355            massive_read_queue_env_coro_db.append(self.serializer.dumps(normalize_compound_key(key)))
1356
1357        self.make_live()
1358        return False, None, None
1359    
1360    @check_request
1361    def _on_get_all_items(self, request: DbRequest) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]:
1362        env_id = request.env_id
1363        db_id = request.db_id
1364        coro_id = self.current_caller_coro_info.coro_id
1365        if env_id not in self.get_all_items_queue:
1366            self.get_all_items_queue[env_id] = list()
1367        
1368        self.get_all_items_queue[env_id].append((coro_id, db_id))
1369        self.make_live()
1370        return False, None, None
1371    
1372    @check_request
1373    def _on_put(self, request: DbRequest, key: KeyType, value: Union[ValueType, ValueInfo]) -> Tuple[bool, RawValueType, Optional[Exception]]:
1374        key = self.serializer.dumps(normalize_compound_key(key))
1375        db_id = request.db_id
1376        env_id = request.env_id
1377        
1378        exception = None
1379        result = None
1380        try:
1381            # self.put_queue
1382            if env_id not in self.put_queue:
1383                self.put_queue[env_id] = dict()
1384            
1385            env_put_queue = self.put_queue[env_id]
1386            if db_id not in env_put_queue:
1387                env_put_queue[db_id] = dict()
1388            
1389            env_put_queue_db = env_put_queue[db_id]
1390            if key not in env_put_queue_db:
1391                env_put_queue_db[key] = list()
1392            
1393            # self.data_cache
1394            if env_id not in self.data_cache:
1395                self.data_cache[env_id] = dict()
1396            
1397            env_data_cache = self.data_cache[env_id]
1398            if db_id not in env_data_cache:
1399                env_data_cache[db_id] = dict()
1400            
1401            env_data_cache_db = env_data_cache[db_id]
1402            if key not in env_data_cache_db:
1403                env_data_cache_db[key] = list()
1404            
1405            # both
1406            if isinstance(value, ValueInfo):
1407                result = RawValueInfo(self.serializer.dumps(value.value), value.dupdata, value.overwrite, value.append)
1408                if env_put_queue_db[key] and not value.dupdata:
1409                    env_data_cache_db[key][0] = env_put_queue_db[key][0] = result
1410                else:
1411                    env_put_queue_db[key].append(result)
1412                    env_data_cache_db[key].append(result)
1413            else:
1414                result = self.serializer.dumps(value)
1415                if env_put_queue_db[key]:
1416                    env_data_cache_db[key][0] = env_put_queue_db[key][0] = result
1417                else:
1418                    env_put_queue_db[key].append(result)
1419                    env_data_cache_db[key].append(result)
1420        except:
1421            exception = get_exception()
1422        
1423        self.make_live()
1424        return True, result, exception
1425    
1426    @check_request
1427    def _on_put_items(self, request: DbRequest, db_items: Dict[InputKeyType, Union[ValueType, ValueInfo]]) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]], Optional[Exception]]:
1428        result_items: Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]] = dict()
1429        db_id = request.db_id
1430        env_id = request.env_id
1431
1432        # self.put_queue
1433        if env_id not in self.put_queue:
1434            self.put_queue[env_id] = dict()
1435        
1436        env_put_queue = self.put_queue[env_id]
1437        if db_id not in result_items:
1438            result_items[db_id] = dict()
1439        
1440        result_db_items = result_items[db_id]
1441        
1442        if db_id not in env_put_queue:
1443            env_put_queue[db_id] = dict()
1444        
1445        env_put_queue_db = env_put_queue[db_id]
1446
1447        # self.data_cache
1448        if env_id not in self.data_cache:
1449            self.data_cache[env_id] = dict()
1450        
1451        env_data_cache = self.data_cache[env_id]
1452        if db_id not in result_items:
1453            result_items[db_id] = dict()
1454        
1455        result_db_items = result_items[db_id]
1456        
1457        if db_id not in env_data_cache:
1458            env_data_cache[db_id] = dict()
1459        
1460        env_data_cache_db = env_data_cache[db_id]
1461
1462        # both
1463        for key, value in db_items.items():
1464            key = self.serializer.dumps(normalize_compound_key(key))
1465            
1466            exception = None
1467            result = None
1468            try:
1469                if key not in env_put_queue_db:
1470                    env_put_queue_db[key] = list()
1471                
1472                if key not in env_data_cache_db:
1473                    env_data_cache_db[key] = list()
1474                
1475                if isinstance(value, ValueInfo):
1476                    result = env_data_cache_db[key][0] = env_put_queue_db[key][0] = RawValueInfo(self.serializer.dumps(value.value), value.dupdata, value.overwrite, value.append)
1477                    if env_data_cache_db[key] and not value.dupdata:
1478                        env_data_cache_db[key][0] = env_put_queue_db[key][0] = result
1479                    else:
1480                        env_put_queue_db[key].append(result)
1481                        env_data_cache_db[key].append(result)
1482                else:
1483                    result = self.serializer.dumps(value)
1484                    if env_data_cache_db[key]:
1485                        env_data_cache_db[key][0] = env_put_queue_db[key][0] = result
1486                    else:
1487                        env_put_queue_db[key].append(result)
1488                        env_data_cache_db[key].append(result)
1489            except:
1490                exception = get_exception()
1491            
1492            result_db_items[key] = (result, exception)
1493        
1494        self.make_live()
1495        return True, result_items, None
1496    
1497    @check_request
1498    def _on_delete(self, request: DbRequest, key: KeyType) -> Tuple[bool, None, Optional[Exception]]:
1499        key = self.serializer.dumps(normalize_compound_key(key))
1500        if key.startswith(EnvInfo.db_name_prefix_bytes):
1501            return True, None, KeyError(f'Can not delete special key (db info key): {key}')
1502        
1503        db_id = request.db_id
1504        env_id = request.env_id
1505        
1506        exception = None
1507        result = None
1508        try:
1509            if env_id not in self.deletion_cache:
1510                self.deletion_cache[env_id] = dict()
1511            
1512            env_deletion_cache = self.deletion_cache[env_id]
1513            if db_id not in env_deletion_cache:
1514                env_deletion_cache[db_id] = set()
1515            
1516            env_deletion_cache_db = env_deletion_cache[db_id]
1517            env_deletion_cache_db.add(key)
1518        except:
1519            exception = get_exception()
1520        
1521        self.make_live()
1522        return True, result, exception
1523    
1524    @check_request
1525    def _on_delete_kv(self, request: DbRequest, key: InputKeyType, value: ValueType) -> Tuple[bool, RawValueType, Optional[Exception]]:
1526        raw_key: bytes = self.serializer.dumps(normalize_compound_key(key))
1527        if raw_key.startswith(EnvInfo.db_name_prefix_bytes):
1528            return True, None, KeyError(f'Can not delete special raw_key (db info raw_key): {raw_key}')
1529        
1530        db_id = request.db_id
1531        env_id = request.env_id
1532        
1533        exception = None
1534        raw_value = None
1535        try:
1536            if env_id not in self.kv_deletion_cache:
1537                self.kv_deletion_cache[env_id] = dict()
1538            
1539            env_kv_deletion_cache = self.kv_deletion_cache[env_id]
1540            if db_id not in env_kv_deletion_cache:
1541                env_kv_deletion_cache[db_id] = dict()
1542            
1543            env_kv_deletion_cache_db = env_kv_deletion_cache[db_id]
1544            if raw_key not in env_kv_deletion_cache_db:
1545                env_kv_deletion_cache_db[raw_key] = list()
1546            
1547            raw_value = self.serializer.dumps(value)
1548            env_kv_deletion_cache_db[raw_key].append(raw_value)
1549        except:
1550            exception = get_exception()
1551        
1552        self.make_live()
1553        return True, (raw_key, raw_value), exception
1554    
1555    @check_request
1556    def _on_delete_items(self, request: DbRequest, db_items: Set[InputKeyType]) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Optional[Exception]]], Optional[Exception]]:
1557        result_items: Dict[InputKeyType, Tuple[RawKeyType, Optional[Exception]]] = dict()
1558        db_id = request.db_id
1559        env_id = request.env_id
1560
1561        if env_id not in self.deletion_cache:
1562            self.deletion_cache[env_id] = dict()
1563        
1564        env_deletion_cache = self.deletion_cache[env_id]
1565        if db_id not in env_deletion_cache:
1566            env_deletion_cache[db_id] = set()
1567        
1568        env_deletion_cache_db = env_deletion_cache[db_id]
1569        for key in db_items:
1570            exception = None
1571            try:
1572                raw_key: bytes = self.serializer.dumps(normalize_compound_key(key))
1573                if raw_key.startswith(EnvInfo.db_name_prefix_bytes):
1574                    exception = KeyError(f'Can not delete special key (db info key): {raw_key}')
1575                else:
1576                    env_deletion_cache_db.add(raw_key)
1577            except:
1578                exception = get_exception()
1579            
1580            result_items[key] = (raw_key, exception)
1581
1582        self.make_live()
1583        return True, result_items, None
1584    
1585    @check_request
1586    def _on_delete_kv_items(self, request: DbRequest, db_items: Dict[InputKeyType, Tuple[ValueType]]) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]], Optional[Exception]]:
1587        result_items: Dict[InputKeyType, Tuple[RawKeyType, RawValueType, Optional[Exception]]] = dict()
1588        db_id = request.db_id
1589        env_id = request.env_id
1590
1591        if env_id not in self.kv_deletion_cache:
1592            self.kv_deletion_cache[env_id] = dict()
1593        
1594        env_kv_deletion_cache = self.kv_deletion_cache[env_id]
1595        if db_id not in env_kv_deletion_cache:
1596            env_kv_deletion_cache[db_id] = dict()
1597        
1598        env_kv_deletion_cache_db = env_kv_deletion_cache[db_id]
1599        for key, value in db_items.items():
1600            exception = None
1601            raw_key = None
1602            raw_value = None
1603            try:
1604                raw_key: bytes = self.serializer.dumps(normalize_compound_key(key))
1605                if raw_key.startswith(EnvInfo.db_name_prefix_bytes):
1606                    exception = KeyError(f'Can not delete special key (db info key): {raw_key}')
1607                else:
1608                    if raw_key not in env_kv_deletion_cache_db:
1609                        env_kv_deletion_cache_db[raw_key] = list()
1610                    
1611                    raw_value = self.serializer.dumps(value)
1612                    env_kv_deletion_cache_db[raw_key].append(self.serializer.dumps(value))
1613            except:
1614                exception = get_exception()
1615            
1616            result_items[key] = (raw_key, raw_value, exception)
1617        
1618        self.make_live()
1619        return True, result_items, None
1620    
1621    def _on_open_databases(self, request: DbRequest, db_ids: Set[DbId], *args, **kwargs) -> ServiceProcessingResponse:
1622        try:
1623            env_info: EnvInfo = self.db_environments[request.env_id]
1624        except KeyError:
1625            exception = UnknownEnvError(request.env_id)
1626            return True, None, exception
1627        
1628        for db_id in db_ids:
1629            env_info.open_db(db_id, *args, **kwargs)
1630        
1631        env_info.env.sync(True)
1632        return True, None, None
1633    
1634    @check_request
1635    def _on_drop_db(self, request: DbRequest, delete: bool = False) -> ServiceProcessingResponse:
1636        coro_id = self.current_caller_coro_info.coro_id
1637        db_id = request.db_id
1638        env_id = request.env_id
1639
1640        if env_id not in self.drop_db_requests:
1641            self.drop_db_requests[env_id] = dict()
1642        
1643        drop_db_requests_env = self.drop_db_requests[env_id]
1644
1645        if db_id not in drop_db_requests_env:
1646            drop_db_requests_env[db_id] = (False, set())
1647        
1648        drop_db_requests_env_db: Tuple[bool, Set[CoroID]] = drop_db_requests_env[db_id]
1649
1650        if delete and (not drop_db_requests_env_db[0]):
1651            drop_db_requests_env_db = (True, drop_db_requests_env_db[1])
1652        
1653        drop_db_requests_env_db[1].add(coro_id)
1654        self.make_live()
1655        return False, None, None
1656    
1657    def sync_in_thread_pool(self, env_id: EnvId = None):
1658        async def sync_db_coro(i: Interface, self: 'Db', env_id: EnvId, asyncio_loop, need_to_ensure_asyncio_loop: bool):
1659            if need_to_ensure_asyncio_loop:
1660                asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None,CoroPriority.low, True))
1661            else:
1662                if asyncio_loop is None:
1663                    asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().get())
1664            
1665            async def sync_db(self: 'Db', asyncio_loop, env: lmdb.Environment):
1666                def sync_worker():
1667                    env.sync(True)
1668                    self.write_locked.discard(env_id)
1669                
1670                await task_in_thread_pool(asyncio_loop, sync_worker)
1671
1672            env: lmdb.Environment = self.db_environments[env_id].env
1673            await i(AsyncioLoop, AsyncioLoopRequest().wait(sync_db(self, asyncio_loop, env)))
1674            self.write_locked_coro_id.discard(i.coro_id)
1675            self.envs_in_sync.discard(env_id)
1676            def make_service_live_for_a_next_sync(self: 'Db'):
1677                self.make_live()
1678                self.wake_up_handle_registered = False
1679            
1680            if not self.wake_up_handle_registered:
1681                self.wake_up_handle_registered = True
1682                await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self)
1683
1684        asyncio_loop = None
1685        need_to_ensure_asyncio_loop = False
1686        try:
1687            asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get()
1688        except AsyncioLoopWasNotSetError:
1689            need_to_ensure_asyncio_loop = True
1690
1691        coro: CoroWrapperBase = put_root_from_other_service(self, sync_db_coro, self, env_id, asyncio_loop, need_to_ensure_asyncio_loop)
1692        coro.is_background_coro = True
1693        self.write_locked.add(env_id)
1694        self.write_locked_coro_id.add(coro.coro_id)
1695        self.envs_in_sync.add(env_id)
Db( loop: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
504    def __init__(self, loop: CoroSchedulerType):
505        super(Db, self).__init__(loop)
506        self.default_env_name: str = '__default__.dbenv'
507        self.default_envs_dir: str = 'db_envs'
508        # self.drop_db_requests: Dict[CoroID, Tuple[Hashable, bool]] = dict()
509        # self.drop_db_requests: Dict[EnvId, Dict[CoroID, Tuple[DbId, bool]]] = dict()
510        self.drop_db_requests: Dict[EnvId, Dict[DbId, Tuple[bool, Set[CoroID]]]] = dict()
511        # self.read_queue: List[Tuple[CoroID, Tuple[RawKeyType, DbId]]] = list()
512        self.read_queue: Dict[EnvId, Dict[DbId, Dict[RawKeyType, Set[CoroID]]]] = dict()
513        self.massive_read_queue: Dict[EnvId, Dict[CoroID, Dict[DbId, List[RawKeyType]]]] = dict()
514        # self.massive_read_queue: List[Tuple[CoroID, Set[Tuple[KeyType, DbId]]]] = list()
515        self.put_queue: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]] = dict()
516        self.data_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]] = dict()
517        # self.data_cache: Dict[EnvId, Dict[Tuple[RawKeyType, DbId], RawValueType]] = dict()
518        self.max_data_cache_size: Union[None, int] = 10000
519        self.deletion_cache: Dict[EnvId, Dict[DbId, Set[RawKeyType]]] = dict()
520        self.kv_deletion_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[RawValueType]]]] = dict()
521        # self.kv_deletion_cache: Dict[Tuple[Hashable, Hashable], Any] = dict()
522        self.get_first_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict()
523        self.get_last_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict()
524        self.get_n_items_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, int, bool]]] = dict()
525        self.get_items_range_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, RawKeyType, int, bool]]] = dict()
526        # self.get_all_items_queue: List[Tuple[CoroID, DbId, EnvId]] = list()
527        self.get_all_items_queue: Dict[EnvId, List[Tuple[CoroID, DbId]]] = dict()
528        self.open_db_environment_requests: Dict[CoroID, Tuple[EnvId, Union[None, str], bool, Tuple, Dict]] = dict()
529        self.root_path_to_db_environments_rel: RelativePath = None
530        self.app_name_waiter: CoroWrapperBase = None
531        self.default_db_environment: lmdb.Environment = None
532        self.db_environments: Dict[EnvId, EnvInfo] = dict()
533        # self.databases: Dict[Hashable, Any] = dict()
534        # self.db_names: Dict[DbId, DbName] = dict()
535        self.async_loop = None
536        self.sync_time_interval = 1.0
537        self.last_sync_time = perf_counter()
538        self.envs_need_to_be_sync: Set[DbId] = set()
539        self.envs_in_sync: Set[DbId] = set()
540        self.force_sync: Set[EnvId] = set()
541        self.sync_an_each_write: bool = False
542        self.write_locked: Set[EnvId] = set()
543        self.writes_num: int = 0
544        self.reads_num: int = 0
545        self.deletes_num: int = 0
546        self.db_drops_num: int = 0
547        self.write_locked_coro_id: Set[CoroID] = set()
548        self.wake_up_handle_registered: bool = False
549        # self.serializer = best_serializer_for_standard_data((
550        #                                     DataFormats.binary,
551        #                                     DataFormats.messagepack,
552        #                                     Tags.can_use_bytes,
553        #                                     Tags.decode_str_as_str,
554        #                                     Tags.decode_list_as_list,
555        #                                     Tags.decode_bytes_as_bytes,
556        #                                     Tags.superficial,
557        #                                     Tags.current_platform,
558        #                                     Tags.multi_platform,
559        #                                 ),
560        #                                 TestDataType.small,
561        #                                 0.1)
562        self.serializer = Serializer(Serializers.msgspec_messagepack)
563        self._request_workers = {
564            0: self._on_set_root_path_to_db_environments,
565            1: self._on_open_databases,
566            2: self._on_drop_db,
567            3: self._on_sync,
568            4: self._on_get,
569            5: self._on_get_items,
570            6: self._on_get_all_items,
571            7: self._on_put,
572            8: self._on_put_items,
573            9: self._on_delete,
574            10: self._on_delete_kv,
575            11: self._on_delete_items,
576            12: self._on_delete_kv_items,
577            13: self._on_open_db_environment,
578            14: self._on_get_first,
579            15: self._on_get_last,
580            16: self._on_get_n_items,
581            17: self._on_get_items_range,
582            18: self._on_close_db_environment,
583        }
default_env_name: str
default_envs_dir: str
drop_db_requests: Dict[Hashable, Dict[Hashable, Tuple[bool, Set[int]]]]
read_queue: Dict[Hashable, Dict[Hashable, Dict[bytes, Set[int]]]]
massive_read_queue: Dict[Hashable, Dict[int, Dict[Hashable, List[bytes]]]]
put_queue: Dict[Hashable, Dict[Hashable, Dict[bytes, List[Union[bytes, cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.RawValueInfo]]]]]
data_cache: Dict[Hashable, Dict[Hashable, Dict[bytes, List[Union[bytes, cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.RawValueInfo]]]]]
max_data_cache_size: Union[NoneType, int]
deletion_cache: Dict[Hashable, Dict[Hashable, Set[bytes]]]
kv_deletion_cache: Dict[Hashable, Dict[Hashable, Dict[bytes, List[bytes]]]]
get_first_queue: Dict[Hashable, Dict[Hashable, Set[int]]]
get_last_queue: Dict[Hashable, Dict[Hashable, Set[int]]]
get_n_items_queue: Dict[Hashable, Dict[int, Tuple[Hashable, bytes, int, bool]]]
get_items_range_queue: Dict[Hashable, Dict[int, Tuple[Hashable, bytes, bytes, int, bool]]]
get_all_items_queue: Dict[Hashable, List[Tuple[int, Hashable]]]
open_db_environment_requests: Dict[int, Tuple[Hashable, Union[NoneType, str], bool, Tuple, Dict]]
root_path_to_db_environments_rel: cengal.file_system.path_manager.versions.v_0.path_manager.RelativePath
app_name_waiter: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroWrapperBase
default_db_environment: Environment
db_environments: Dict[Hashable, EnvInfo]
async_loop
sync_time_interval
last_sync_time
envs_need_to_be_sync: Set[Hashable]
envs_in_sync: Set[Hashable]
force_sync: Set[Hashable]
sync_an_each_write: bool
write_locked: Set[Hashable]
writes_num: int
reads_num: int
deletes_num: int
db_drops_num: int
write_locked_coro_id: Set[int]
wake_up_handle_registered: bool
serializer
def get_entity_stats( self, stats_level: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin.StatsLevel = <StatsLevel.debug: 1>) -> Tuple[str, Dict[str, Any]]:
586    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
587        return type(self).__name__, {
588            'db_env_ids': list(self.db_environments.keys()),
589            'writes num': self.writes_num,
590            'reads num': self.reads_num,
591            'deletes num': self.deletes_num,
592            'db drops num': self.db_drops_num,
593        }
def norm_key( self, key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]) -> Union[bytes, str, Tuple[Union[bytes, str]], tuple[Union[Tuple[Union[bytes, str]], Tuple[ForwardRef('NormalizedCompoundKeyType')]]]]:
595    def norm_key(self, key: InputKeyType) -> NormalizedKeyType:
596        return normalize_compound_key(key)
def raw_key( self, env_or_id: typing.Union[Environment, typing.Hashable], key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]) -> bytes:
598    def raw_key(self, env_or_id: Union[lmdb.Environment, EnvId], key: InputKeyType) -> RawKeyType:
599        raw_key: bytes = self.serializer.dumps(normalize_compound_key(key))
600        if isinstance(env_or_id, lmdb.Environment):
601            env = env_or_id
602        else:
603            env = self.db_environments[env_or_id].env
604        
605        if len(raw_key) > env.max_key_size():
606            raise RawKeyIsTooLargeError(f'Raw form ({raw_key=}) of the key ({key=}) is too large: {len(raw_key)} > {env.max_key_size()}')
def destroy(self):
608    def destroy(self):
609        # TODO: we need to use some loop destroy service in order to put our coro which will write all pending queues,
610        # sync envirounments and close them. Also we need to prevent new requests from being processed.
611        db_environments_values = self.db_environments.values()
612        self.db_environments = type(self.db_environments)()
613        self.default_db_environment = None
614        for env_info in db_environments_values:
615            env_info.close()
def single_task_registration_or_immediate_processing( self, *args, **kwargs) -> Tuple[bool, Any, Union[BaseException, NoneType]]:
617    def single_task_registration_or_immediate_processing(
618            self, *args, **kwargs) -> ServiceProcessingResponse:
619        result = self.try_resolve_request(*args, **kwargs)
620        if result is None:
621            self._ensure_default_db_environment()
622            return True, None, None
623        else:
624            return result
def full_processing_iteration(self):
 651    def full_processing_iteration(self):
 652        # TODO: combine all queues into one single transaction by env_id. Or at most two transactions: read and write.
 653        # This will improve performance and will give ability to move transations to other threads or even processess if needed.
 654
 655        # TODO: since DB can not handle big number of transactions per secons, it is better to combine all requests and process 
 656        # them at most as once a milisecond (it is frequently engough).
 657
 658        if not self._ensure_default_db_environment():
 659            return
 660        
 661        if self.force_sync:
 662            self.envs_need_to_be_sync |= self.force_sync
 663            self.force_sync = set()
 664        
 665        put_queue_buff = self.put_queue
 666        self.put_queue = type(put_queue_buff)()
 667        
 668        data_cache_buff = self.data_cache  # will be cleared at the end of the iteration if necessary
 669        
 670        read_queue_buff = self.read_queue
 671        self.read_queue = type(read_queue_buff)()
 672        
 673        massive_read_queue_buff = self.massive_read_queue
 674        self.massive_read_queue = type(massive_read_queue_buff)()
 675        
 676        deletion_cache_buff = self.deletion_cache
 677        self.deletion_cache = type(deletion_cache_buff)()
 678        
 679        kv_deletion_cache_buff = self.kv_deletion_cache
 680        self.kv_deletion_cache = type(kv_deletion_cache_buff)()
 681        
 682        get_all_items_queue_buff = self.get_all_items_queue
 683        self.get_all_items_queue = type(get_all_items_queue_buff)()
 684
 685        get_first_queue_buff = self.get_first_queue
 686        self.get_first_queue = type(get_first_queue_buff)()
 687
 688        get_last_queue_buff = self.get_last_queue
 689        self.get_last_queue = type(get_last_queue_buff)()
 690
 691        get_n_items_queue_buff = self.get_n_items_queue
 692        self.get_n_items_queue = type(get_n_items_queue_buff)()
 693
 694        get_items_range_queue_buff = self.get_items_range_queue
 695        self.get_items_range_queue = type(get_items_range_queue_buff)()
 696
 697        open_db_environment_requests_buff: Dict[CoroID, Tuple[EnvId, Union[None, str], bool, Tuple, Dict]] = self.open_db_environment_requests
 698        self.open_db_environment_requests = type(open_db_environment_requests_buff)()
 699
 700        # open_db_environment
 701        for coro_id, request_info in open_db_environment_requests_buff.items():
 702            env_id, env_path, can_be_written_externally, args, kwargs = request_info
 703            if env_id in self.db_environments:
 704                self.register_response(coro_id, self.db_environments[env_id])
 705            else:
 706                exception = None
 707                try:
 708                    result = self._init_db_env(env_id, env_path, can_be_written_externally, *args, **kwargs)
 709                except:
 710                    exception = get_exception()
 711
 712                self.register_response(coro_id, result, exception)
 713
 714        # put
 715        def put_handler(env_info: EnvInfo, put_info: Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]):
 716            try:
 717                with env_info.env.begin(write=True) as txn:
 718                    for db_id, db_put_info in put_info.items():
 719                        if db_id in env_info.databases:
 720                            for raw_key, values in db_put_info.items():
 721                                for value in values:
 722                                    if isinstance(value, RawValueInfo):
 723                                        value, dupdata, overwrite, append = value
 724                                    else:
 725                                        dupdata, overwrite, append = True, True, False
 726                                    
 727                                    txn.put(raw_key, value, db=env_info.databases[db_id], dupdata=dupdata, overwrite=overwrite, append=append)
 728                        
 729                        self.writes_num += len(db_put_info)
 730            except lmdb.MapFullError:
 731                raise DBError.from_exception(db_id)
 732        
 733        for env_id, put_info in put_queue_buff.items():
 734            if env_id in self.db_environments:
 735                self.envs_need_to_be_sync.add(env_id)
 736                lmdb_reapplier(self.db_environments[env_id], put_handler, put_info)
 737        
 738        # TODO: implement replace* methods processing
 739
 740        # delete
 741        for env_id, deletion_cache_buff_db_info in deletion_cache_buff.items():
 742            if env_id in self.db_environments:
 743                env_info = self.db_environments[env_id]
 744                self.envs_need_to_be_sync.add(env_id)
 745                with env_info.env.begin(write=True) as txn:
 746                    for db_id, del_keys in deletion_cache_buff_db_info.items():
 747                        if db_id in env_info.databases:
 748                            for del_raw_key in del_keys:
 749                                txn.delete(del_raw_key, db=env_info.databases[db_id])
 750                                self.deletes_num += 1
 751                                try:
 752                                    data_cache_buff[env_id][db_id].pop(del_raw_key, None)
 753                                except KeyError:
 754                                    pass
 755
 756        # delete_kv
 757        for env_id, kv_deletion_cache_buff_db_info in kv_deletion_cache_buff.items():
 758            if env_id in self.db_environments:
 759                env_info = self.db_environments[env_id]
 760                self.envs_need_to_be_sync.add(env_id)
 761                with env_info.env.begin(write=True) as txn:
 762                    for db_id, del_keys in kv_deletion_cache_buff_db_info.items():
 763                        if db_id in env_info.databases:
 764                            for del_raw_key, del_raw_values in del_keys.items():
 765                                for del_raw_value in del_raw_values:
 766                                    txn.delete(del_raw_key, del_raw_value, db=env_info.databases[db_id])
 767                                    self.deletes_num += 1
 768                                    try:
 769                                        raw_values: List[Union[RawValueType, RawValueInfo]] = data_cache_buff[env_id][db_id][del_raw_key]
 770                                        new_raw_values: List[Union[RawValueType, RawValueInfo]] = list()
 771                                        for raw_value in raw_values:
 772                                            raw_value_original = raw_value
 773                                            if isinstance(raw_value, RawValueInfo):
 774                                                raw_value, _, _, _ = raw_value
 775                                            
 776                                            if raw_value != del_raw_value:
 777                                                new_raw_values.append(raw_value_original)
 778                                            
 779                                        if new_raw_values:
 780                                            data_cache_buff[env_id][db_id][del_raw_key] = new_raw_values
 781                                        else:
 782                                            data_cache_buff[env_id][db_id].pop(del_raw_key, None)
 783                                    except KeyError:
 784                                        pass
 785
 786        # drop
 787        drop_db_requests_buff = self.drop_db_requests
 788        self.drop_db_requests = type(drop_db_requests_buff)()
 789        
 790        def drop_handler(env_info: EnvInfo, drop_info: Dict[DbId, Tuple[bool, Set[CoroID]]]):
 791            for db_id, db_drop_info in drop_info.items():
 792                delete_db, coro_id = db_drop_info
 793                if db_id in env_info.databases:
 794                    try:
 795                        with env_info.env.begin(write=True) as txn:
 796                            txn.drop(db=env_info.databases[db_id], delete=delete_db)
 797                            if delete_db:
 798                                del env_info.databases[db_id]
 799                                del env_info.db_names[db_id]
 800                        
 801                        self.db_drops_num += 1
 802                    except lmdb.MapFullError:
 803                        raise DBError.from_exception(db_id)
 804                    
 805                    self.register_response(coro_id, None, UnknownEnvError(env_id))
 806                else:
 807                    self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 808        
 809        for env_id, drop_info in drop_db_requests_buff.items():
 810            if env_id in self.db_environments:
 811                self.envs_need_to_be_sync.add(env_id)
 812                lmdb_reapplier(self.db_environments[env_id], drop_handler, drop_info)
 813            else:
 814                for db_id, db_drop_info in drop_info.items():
 815                    delete_db, coro_id = db_drop_info
 816                    self.register_response(coro_id, None, UnknownEnvError(env_id))
 817
 818        # get
 819        def get_item(txn, key_info: Tuple[RawKeyType, DbId, EnvId], data_cache_buff: Dict[EnvId, Dict[DbId, Dict[RawKeyType, List[Union[RawValueType, RawValueInfo]]]]]) -> Tuple[ValueType, Optional[Exception]]:
 820            key, db_id, env_id = key_info
 821            need_to_get_from_db = True
 822            try:
 823                values = data_cache_buff[env_id][db_id][key]
 824                if values:
 825                    value = values[0]
 826                    if isinstance(value, RawValueInfo):
 827                        value, _, _, _ = value
 828
 829                    need_to_get_from_db = False
 830            except KeyError:
 831                pass
 832            
 833            if need_to_get_from_db:
 834                value = txn.get(key, db=self.db_environments[env_id].databases[db_id])
 835                self.reads_num += 1
 836            
 837            exception = None
 838            try:
 839                if value is None:
 840                    exception = DbKeyError(key_info)
 841                else:
 842                    value = self.serializer.loads(value)
 843            except:
 844                exception = get_exception()
 845            
 846            return value, exception
 847        
 848        # _on_get
 849        for env_id, read_queue_buff_db_info in read_queue_buff.items():
 850            if env_id in self.db_environments:
 851                env_info = self.db_environments[env_id]
 852                for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items():
 853                    if db_id in env_info.databases:
 854                        with env_info.env.begin() as txn:
 855                            for raw_key, coro_ids in read_queue_buff_db_key_info.items():
 856                                value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff)
 857                                for coro_id in coro_ids:
 858                                    self.register_response(coro_id, value, exception)
 859                    else:
 860                        for raw_key, coro_ids in read_queue_buff_db_key_info.items():
 861                            for coro_id in coro_ids:
 862                                self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 863            else:
 864                for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items():
 865                    for raw_key, coro_ids in read_queue_buff_db_key_info.items():
 866                        for coro_id in coro_ids:
 867                            self.register_response(coro_id, None, UnknownEnvError(env_id))
 868        
 869        # _on_get_items
 870        results: Dict[CoroID, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]]] = dict()
 871        for env_id, massive_read_queue_buff_coro_info in massive_read_queue_buff.items():
 872            if env_id in self.db_environments:
 873                env_info = self.db_environments[env_id]
 874                for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items():
 875                    if coro_id not in results:
 876                        results[coro_id] = dict()
 877                    
 878                    coro_results = results[coro_id]
 879                    for db_id, raw_keys in read_queue_buff_db_info.items():
 880                        if db_id not in coro_results:
 881                            coro_results[db_id] = OrderedDict()
 882                        
 883                        coro_db_results = coro_results[db_id]
 884                        if db_id in env_info.databases:
 885                            with env_info.env.begin() as txn:
 886                                for raw_key in raw_keys:
 887                                    value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff)
 888                                    coro_db_results[normalize_compound_key(raw_key)] = (value, exception)
 889                        else:
 890                            for coro_id in coro_ids:
 891                                self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 892            else:
 893                for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items():
 894                    for db_id, raw_keys in read_queue_buff_db_info.items():
 895                        for coro_id in coro_ids:
 896                            self.register_response(coro_id, None, UnknownEnvError(env_id))
 897        
 898        for coro_id, coro_results in results.items():
 899            if coro_results:
 900                db_id = tuple(coro_results.keys())[0]
 901                self.register_response(coro_id, coro_results[db_id], None)
 902            else:
 903                self.register_response(coro_id, OrderedDict(), None)
 904
 905        # get all items
 906        for env_id, requests_info in get_all_items_queue_buff.items():
 907            for request_info in requests_info:
 908                coro_id, db_id = request_info
 909                if env_id in self.db_environments:
 910                    env_info = self.db_environments[env_id]
 911                    env = env_info.env
 912                    if db_id in env_info.databases:
 913                        db = env_info.databases[db_id]
 914                        with env.begin(db=db) as txn:
 915                            result = list()
 916                            exception = None
 917                            try:
 918                                result = [(normalize_compound_key(self.serializer.loads(k)), self.serializer.loads(v)) for k, v in txn.cursor() if not k.startswith(EnvInfo.db_name_prefix_bytes)]
 919                                self.reads_num += len(result)
 920                            except:
 921                                exception = get_exception()
 922                            
 923                            self.register_response(coro_id, result, exception)
 924                    else:
 925                        self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 926                else:
 927                    self.register_response(coro_id, None, UnknownEnvError(env_id))
 928        
 929        # get_first
 930        for env_id, get_first_queue_buff_db_info in get_first_queue_buff.items():
 931            if env_id in self.db_environments:
 932                env_info = self.db_environments[env_id]
 933                for db_id, coro_ids in get_first_queue_buff_db_info.items():
 934                    if db_id in env_info.databases:
 935                        db = env_info.databases[db_id]
 936                        with env_info.env.begin(db=db) as txn:
 937                            result = None
 938                            exception = None
 939                            try:
 940                                cursor: lmdb.Cursor = txn.cursor()
 941                                if cursor.first():
 942                                    key = cursor.key()
 943                                    key_found: bool = True
 944                                    while key.startswith(EnvInfo.db_name_prefix_bytes):
 945                                        key_found = cursor.next_nodup()
 946                                        if not key_found:
 947                                            break
 948
 949                                        key = cursor.key()
 950
 951                                    if key_found:
 952                                        value = cursor.value()
 953                                        result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value()))
 954                                        self.reads_num += 1
 955                                    else:
 956                                        exception = KeyError()
 957                                else:
 958                                    exception = KeyError()
 959                            except:
 960                                exception = get_exception()
 961                            
 962                            for coro_id in coro_ids:
 963                                self.register_response(coro_id, result, exception)
 964                    else:
 965                        for coro_id in coro_ids:
 966                            self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 967            else:
 968                for db_id, coro_ids in get_first_queue_buff_db_info.items():
 969                    for coro_id in coro_ids:
 970                        self.register_response(coro_id, None, UnknownEnvError(env_id))
 971        
 972        # get_last
 973        for env_id, get_last_queue_buff_db_info in get_last_queue_buff.items():
 974            if env_id in self.db_environments:
 975                env_info = self.db_environments[env_id]
 976                for db_id, coro_ids in get_last_queue_buff_db_info.items():
 977                    if db_id in env_info.databases:
 978                        db = env_info.databases[db_id]
 979                        with env_info.env.begin(db=db) as txn:
 980                            result = None
 981                            exception = None
 982                            try:
 983                                cursor: lmdb.Cursor = txn.cursor()
 984                                if cursor.last():
 985                                    key: bytes = cursor.key()
 986                                    key_found: bool = True
 987                                    key = cursor.key()
 988                                    while key.startswith(EnvInfo.db_name_prefix_bytes):
 989                                        key_found = cursor.prev_nodup()
 990                                        if not key_found:
 991                                            break
 992
 993                                        key = cursor.key()
 994
 995                                    if key_found:
 996                                        value: bytes = cursor.value()
 997                                        result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value()))
 998                                        self.reads_num += 1
 999                                    else:
1000                                        exception = KeyError()
1001                                else:
1002                                    exception = KeyError()
1003                            except:
1004                                exception = get_exception()
1005                            
1006                            for coro_id in coro_ids:
1007                                self.register_response(coro_id, result, exception)
1008                    else:
1009                        for coro_id in coro_ids:
1010                            self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
1011            else:
1012                for db_id, coro_ids in get_last_queue_buff_db_info.items():
1013                    for coro_id in coro_ids:
1014                        self.register_response(coro_id, None, UnknownEnvError(env_id))
1015        
1016        # get_n_items
1017        for env_id, get_n_items_queue_buff_coro_info in get_n_items_queue_buff.items():
1018            if env_id in self.db_environments:
1019                env_info = self.db_environments[env_id]
1020                for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items():
1021                    coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list()
1022                    db_id, first_desired_raw_key, num_items, reverse = read_queue_buff_db_info
1023                    if db_id in env_info.databases:
1024                        db = env_info.databases[db_id]
1025                        with env_info.env.begin(db=db) as txn:
1026                            exception = None
1027                            try:
1028                                cursor: lmdb.Cursor = txn.cursor()
1029                                if cursor.set_range(first_desired_raw_key):
1030                                    if reverse:
1031                                        cursor_iterator = cursor.iterprev(keys=True, values=True)
1032                                    else:
1033                                        cursor_iterator = cursor.iternext(keys=True, values=True)
1034                                    
1035                                    for raw_key, raw_value in cursor_iterator:
1036                                        if raw_key.startswith(EnvInfo.db_name_prefix_bytes):
1037                                            continue
1038
1039                                        if (num_items is not None) and (num_items <= 0):
1040                                            break
1041                                        
1042                                        coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value)))
1043                                        self.reads_num += 1
1044                                        if num_items is not None:
1045                                            num_items -= 1
1046                                else:
1047                                    exception = KeyError()
1048                            except:
1049                                exception = get_exception()
1050                            
1051                            self.register_response(coro_id, coro_results, exception)
1052                    else:
1053                        self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
1054            else:
1055                for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items():
1056                    for coro_id in coro_ids:
1057                        self.register_response(coro_id, None, UnknownEnvError(env_id))
1058        
1059        # get_items_range
1060        for env_id, get_items_range_queue_buff_coro_info in get_items_range_queue_buff.items():
1061            if env_id in self.db_environments:
1062                env_info = self.db_environments[env_id]
1063                for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items():
1064                    coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list()
1065                    db_id, first_desired_raw_key, last_desired_raw_key, num_items, reverse = read_queue_buff_db_info
1066                    if db_id in env_info.databases:
1067                        db = env_info.databases[db_id]
1068                        with env_info.env.begin(db=db) as txn:
1069                            exception = None
1070                            try:
1071                                cursor: lmdb.Cursor = txn.cursor()
1072                                if cursor.set_range(first_desired_raw_key):
1073                                    if reverse:
1074                                        cursor_iterator = cursor.iterprev(keys=True, values=True)
1075                                    else:
1076                                        cursor_iterator = cursor.iternext(keys=True, values=True)
1077                                    
1078                                    for raw_key, raw_value in cursor_iterator:
1079                                        if raw_key.startswith(EnvInfo.db_name_prefix_bytes):
1080                                            continue
1081                                        
1082                                        if reverse:
1083                                            if raw_key < last_desired_raw_key:
1084                                                break
1085                                        else:
1086                                            if raw_key > last_desired_raw_key:
1087                                                break
1088
1089                                        if (num_items is not None) and (num_items <= 0):
1090                                            break
1091
1092                                        coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value)))
1093                                        self.reads_num += 1
1094                                        if num_items is not None:
1095                                            num_items -= 1
1096                                else:
1097                                    exception = KeyError()
1098                            except:
1099                                exception = get_exception()
1100                            
1101                            self.register_response(coro_id, coro_results, exception)
1102                    else:
1103                        self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
1104            else:
1105                for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items():
1106                    for coro_id in coro_ids:
1107                        self.register_response(coro_id, None, UnknownEnvError(env_id))
1108        
1109        need_to_sync = self.sync_an_each_write
1110
1111        # periodic sync
1112        if (perf_counter() - self.last_sync_time) >= self.sync_time_interval:
1113            for env_id, env_info in self.db_environments.items():
1114                self.envs_need_to_be_sync.add(env_id)
1115                need_to_sync = True
1116
1117        # sync
1118        if need_to_sync and self.envs_need_to_be_sync:
1119            envs_need_to_be_sync_bak = self.envs_need_to_be_sync - self.envs_in_sync
1120            self.envs_need_to_be_sync = set(self.envs_in_sync)
1121            for env_id in envs_need_to_be_sync_bak:
1122                self.sync_in_thread_pool(env_id)
1123            
1124            self.last_sync_time = perf_counter()
1125        
1126        # invalidate data_cache for envs that can be written externally
1127        for env_id, env_info in self.db_environments.items():
1128            if env_info.init_info.can_be_written_externally:
1129                self.data_cache.pop(env_id, None)
1130        
1131        # clear too big caches
1132        for env_id, env_info in self.db_environments.items():
1133            if env_id in self.data_cache:
1134                if len(self.data_cache[env_id]) > self.max_data_cache_size:
1135                    del self.data_cache[env_id]
1136
1137        self.make_dead()
def in_work(self) -> bool:
1139    def in_work(self) -> bool:
1140        result: bool = bool(self.default_db_environment is None) \
1141                            or bool(self.open_db_environment_requests) \
1142                            or bool(self.get_first_queue) \
1143                            or bool(self.get_last_queue) \
1144                            or bool(self.get_n_items_queue) \
1145                            or bool(self.get_items_range_queue) \
1146                            or bool(self.read_queue) \
1147                            or bool(self.massive_read_queue) \
1148                            or bool(self.get_all_items_queue)\
1149                            or bool(self.force_sync) \
1150                            or bool(self.deletion_cache) \
1151                            or bool(self.kv_deletion_cache) \
1152                            or bool(self.drop_db_requests) \
1153                            or bool(self.put_queue) \
1154                            or bool(self.envs_need_to_be_sync) \
1155                            or ((perf_counter() - self.last_sync_time) >= self.sync_time_interval)
1156        return self.thrifty_in_work(result)

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

def time_left_before_next_event(self) -> Tuple[bool, Union[int, float, NoneType]]:
1158    def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]:
1159        time_since_last_sync_time: float = perf_counter() - self.last_sync_time
1160        if self.sync_time_interval > time_since_last_sync_time:
1161            return True, self.sync_time_interval - time_since_last_sync_time
1162        else:
1163            return True, 0
def sync_in_thread_pool(self, env_id: typing.Hashable = None):
1657    def sync_in_thread_pool(self, env_id: EnvId = None):
1658        async def sync_db_coro(i: Interface, self: 'Db', env_id: EnvId, asyncio_loop, need_to_ensure_asyncio_loop: bool):
1659            if need_to_ensure_asyncio_loop:
1660                asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None,CoroPriority.low, True))
1661            else:
1662                if asyncio_loop is None:
1663                    asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().get())
1664            
1665            async def sync_db(self: 'Db', asyncio_loop, env: lmdb.Environment):
1666                def sync_worker():
1667                    env.sync(True)
1668                    self.write_locked.discard(env_id)
1669                
1670                await task_in_thread_pool(asyncio_loop, sync_worker)
1671
1672            env: lmdb.Environment = self.db_environments[env_id].env
1673            await i(AsyncioLoop, AsyncioLoopRequest().wait(sync_db(self, asyncio_loop, env)))
1674            self.write_locked_coro_id.discard(i.coro_id)
1675            self.envs_in_sync.discard(env_id)
1676            def make_service_live_for_a_next_sync(self: 'Db'):
1677                self.make_live()
1678                self.wake_up_handle_registered = False
1679            
1680            if not self.wake_up_handle_registered:
1681                self.wake_up_handle_registered = True
1682                await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self)
1683
1684        asyncio_loop = None
1685        need_to_ensure_asyncio_loop = False
1686        try:
1687            asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get()
1688        except AsyncioLoopWasNotSetError:
1689            need_to_ensure_asyncio_loop = True
1690
1691        coro: CoroWrapperBase = put_root_from_other_service(self, sync_db_coro, self, env_id, asyncio_loop, need_to_ensure_asyncio_loop)
1692        coro.is_background_coro = True
1693        self.write_locked.add(env_id)
1694        self.write_locked_coro_id.add(coro.coro_id)
1695        self.envs_in_sync.add(env_id)
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
in_forground_work
thrifty_in_work
is_low_latency
make_live
make_dead
service_id_impl
service_id
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
StatsLevel
class DbRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
274class DbRequest(ServiceRequest):
275    def __init__(self, env_id: EnvId = None, db_id: DbId = None, needs_sync: bool = False, can_wait: bool = False):
276        super().__init__()
277        self.env_id: EnvId = env_id
278        self.db_id: DbId = db_id
279        self.needs_sync: bool = needs_sync
280        self.provide_to_request_handler = True
281        self.can_wait: bool = can_wait  # TODO: implement. If True then request can wait for a next iteration in attempt to create a bunch of requests. If False then request will be processed immediately.
282    
283    def _copy(self) -> 'DbRequest':
284        return DbRequest(self.env_id, self.db_id, self.needs_sync)
285    
286    def set_root_path_to_db_environments(self, root_path_to_db_environments: str) -> bool:
287        return self._save_to_copy(0, root_path_to_db_environments)
288    
289    def open_databases(self, db_ids: Set[DbId], *args, **kwargs) -> None:
290        return self._save_to_copy(1, db_ids, *args, **kwargs)
291    
292    def drop_db(self, db_id: DbId, delete: bool = False) -> None:
293        return self._save_to_copy(2, db_id, delete)
294    
295    def sync(self) -> None:
296        return self._save_to_copy(3)
297    
298    def wait_sync(self) -> None:  # TODO: implement
299        return self._save_to_copy(22)
300    
301    def set_sync_timeout(self, timeout: RationalNumber) -> None:  # TODO: implement
302        return self._save_to_copy(23, timeout)
303    
304    def get(self, key: InputKeyType) -> ValueType:
305        return self._save_to_copy(4, key)
306    
307    def get_first(self) -> Tuple[NormalizedKeyType, ValueType]:
308        # Returns first item in DB
309        return self._save_to_copy(14)
310    
311    def get_last(self) -> Tuple[NormalizedKeyType, ValueType]:
312        # Returns last item in DB
313        return self._save_to_copy(15)
314    
315    def get_items(self, db_keys: Sequence[InputKeyType]) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:
316        return self._save_to_copy(5, db_keys)
317    
318    def get_n_items(self, desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:
319        return self._save_to_copy(16, desired_key, num, reverse=False)
320    
321    def get_reverse_n_items(self, desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:
322        return self._save_to_copy(16, desired_key, num, reverse=True)
323    
324    def get_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:
325        return self._save_to_copy(17, first_desired_key, last_desired_key, num, reverse=False)
326    
327    def get_reverse_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:
328        return self._save_to_copy(17, first_desired_key, last_desired_key, num, reverse=True)
329    
330    def get_all_items(self) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:
331        # Returns all items from DB
332        return self._save_to_copy(6)
333    
334    def pop(self, key: InputKeyType) -> ValueType:  # TODO: implement
335        return self._save_to_copy(24, key)
336    
337    def pop_items(self, db_keys: Sequence[InputKeyType]) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:  # TODO: implement
338        return self._save_to_copy(25, db_keys)
339    
340    def put(self, key: InputKeyType, value: Optional[Union[ValueType, ValueInfo]] = None) -> RawValueType:
341        return self._save_to_copy(7, key, value)
342    
343    def put_items(self, db_items: Dict[InputKeyType, Union[ValueType, ValueInfo]]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]:
344        return self._save_to_copy(8, db_items)
345    
346    def increment(self, key: InputKeyType, value: RationalNumber = 1) -> RationalNumber:  # TODO: implement
347        return self._save_to_copy(19, key, value)
348    
349    inc = increment
350    
351    def increment_items(self, db_items: Dict[InputKeyType, Union[RationalNumber, ValueInfo]]) -> OrderedDictType[NormalizedKeyType, Tuple[RationalNumber, Optional[Exception]]]:  # TODO: implement
352        return self._save_to_copy(8, db_items)
353    
354    inc_items = increment_items
355    
356    def replace(self, key: InputKeyType, value: Optional[Union[ValueType, ValueInfo]] = None) -> Tuple[RawValueType, ValueType]:  # TODO: implement
357        return self._save_to_copy(20, key, value)
358    
359    def replace_items(self, db_items: Dict[InputKeyType, Union[ValueType, ValueInfo]]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, ValueType, Optional[Exception]]]]:  # TODO: implement
360        return self._save_to_copy(21, db_items)
361    
362    def delete(self, key: InputKeyType) -> RawValueType:  # TODO: finish an implementation
363        return self._save_to_copy(9, key)
364    
365    def delete_kv(self, key: InputKeyType, value: ValueType) -> RawValueType:  # TODO: finish an implementation
366        return self._save_to_copy(10, key, value)
367    
368    def delete_items(self, db_items: Set[InputKeyType]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]:  # TODO: finish an implementation
369        return self._save_to_copy(11, db_items)
370    
371    def delete_kv_items(self, db_items: Dict[InputKeyType, Tuple[ValueType]]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]:  # TODO: finish an implementation
372        return self._save_to_copy(12, db_items)
373    
374    def execute_in_transaction(self, env_id: EnvId, callable_or_coro: Union[Callable, AnyWorker]) -> None:  # TODO: implement
375        # Will execute given callable or coroutine in transaction (after all other queues will be processed)
376        # It will run in current thread. So, it will block current thread until it will be finished if it is not a coroutine.
377        # On the other hand it will lock environment and all databases in it until coroutine will be finished.
378        return self._save_to_copy(29, env_id, callable_or_coro)
379
380    def open_db_environment(self, env_id: EnvId, env_path: Union[None, str], can_be_written_externally: bool, *args, **kwargs) -> EnvId:
381        return self._save_to_copy(13, env_id, env_path, can_be_written_externally, *args, **kwargs)
382    
383    def close_db_environment(self, env_id: EnvId) -> None:
384        return self._save_to_copy(18, env_id)
385    
386    def lock_databases(self, db_names: Optional[Set[DbId]] = None) -> None:  # TODO: implement
387        # Lock all databases if db_names is None. Databases will be released automatically wnen coroutine execution will be finished
388        return self._save_to_copy(26, db_names)
389    
390    def try_lock_databases(self, db_names: Optional[Set[DbId]] = None) -> bool:  # TODO: implement
391        # Tries to lock all databases if db_names is None. Returns True if try was successfull. False otherwise. Databases will be released automatically wnen coroutine execution will be finished
392        return self._save_to_copy(27, db_names)
393    
394    def unlock_databases(self, db_names: Optional[Set[DbId]] = None) -> None:  # TODO: implement
395        # Unlock all databases if db_names is None
396        return self._save_to_copy(28, db_names)
DbRequest( env_id: typing.Hashable = None, db_id: typing.Hashable = None, needs_sync: bool = False, can_wait: bool = False)
275    def __init__(self, env_id: EnvId = None, db_id: DbId = None, needs_sync: bool = False, can_wait: bool = False):
276        super().__init__()
277        self.env_id: EnvId = env_id
278        self.db_id: DbId = db_id
279        self.needs_sync: bool = needs_sync
280        self.provide_to_request_handler = True
281        self.can_wait: bool = can_wait  # TODO: implement. If True then request can wait for a next iteration in attempt to create a bunch of requests. If False then request will be processed immediately.
env_id: Hashable
db_id: Hashable
needs_sync: bool
provide_to_request_handler
can_wait: bool
def set_root_path_to_db_environments(self, root_path_to_db_environments: str) -> bool:
286    def set_root_path_to_db_environments(self, root_path_to_db_environments: str) -> bool:
287        return self._save_to_copy(0, root_path_to_db_environments)
def open_databases(self, db_ids: typing.Set[typing.Hashable], *args, **kwargs) -> None:
289    def open_databases(self, db_ids: Set[DbId], *args, **kwargs) -> None:
290        return self._save_to_copy(1, db_ids, *args, **kwargs)
def drop_db(self, db_id: typing.Hashable, delete: bool = False) -> None:
292    def drop_db(self, db_id: DbId, delete: bool = False) -> None:
293        return self._save_to_copy(2, db_id, delete)
def sync(self) -> None:
295    def sync(self) -> None:
296        return self._save_to_copy(3)
def wait_sync(self) -> None:
298    def wait_sync(self) -> None:  # TODO: implement
299        return self._save_to_copy(22)
def set_sync_timeout(self, timeout: typing.Union[int, float]) -> None:
301    def set_sync_timeout(self, timeout: RationalNumber) -> None:  # TODO: implement
302        return self._save_to_copy(23, timeout)
def get( self, key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]) -> Any:
304    def get(self, key: InputKeyType) -> ValueType:
305        return self._save_to_copy(4, key)
def get_first( self) -> tuple[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Any]:
307    def get_first(self) -> Tuple[NormalizedKeyType, ValueType]:
308        # Returns first item in DB
309        return self._save_to_copy(14)
def get_last( self) -> tuple[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Any]:
311    def get_last(self) -> Tuple[NormalizedKeyType, ValueType]:
312        # Returns last item in DB
313        return self._save_to_copy(15)
def get_items( self, db_keys: collections.abc.Sequence[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]]) -> collections.OrderedDict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Any, typing.Union[Exception, NoneType]]]:
315    def get_items(self, db_keys: Sequence[InputKeyType]) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:
316        return self._save_to_copy(5, db_keys)
def get_n_items( self, desired_key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], num: typing.Union[int, NoneType] = None) -> collections.OrderedDict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Any, typing.Union[Exception, NoneType]]]:
318    def get_n_items(self, desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:
319        return self._save_to_copy(16, desired_key, num, reverse=False)
def get_reverse_n_items( self, desired_key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], num: typing.Union[int, NoneType] = None) -> collections.OrderedDict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Any, typing.Union[Exception, NoneType]]]:
321    def get_reverse_n_items(self, desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:
322        return self._save_to_copy(16, desired_key, num, reverse=True)
def get_items_range( self, first_desired_key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], last_desired_key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], num: typing.Union[int, NoneType] = None) -> collections.OrderedDict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Any, typing.Union[Exception, NoneType]]]:
324    def get_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:
325        return self._save_to_copy(17, first_desired_key, last_desired_key, num, reverse=False)
def get_reverse_items_range( self, first_desired_key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], last_desired_key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], num: typing.Union[int, NoneType] = None) -> collections.OrderedDict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Any, typing.Union[Exception, NoneType]]]:
327    def get_reverse_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:
328        return self._save_to_copy(17, first_desired_key, last_desired_key, num, reverse=True)
def get_all_items( self) -> collections.OrderedDict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Any, typing.Union[Exception, NoneType]]]:
330    def get_all_items(self) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:
331        # Returns all items from DB
332        return self._save_to_copy(6)
def pop( self, key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]) -> Any:
334    def pop(self, key: InputKeyType) -> ValueType:  # TODO: implement
335        return self._save_to_copy(24, key)
def pop_items( self, db_keys: collections.abc.Sequence[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]]) -> collections.OrderedDict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Any, typing.Union[Exception, NoneType]]]:
337    def pop_items(self, db_keys: Sequence[InputKeyType]) -> OrderedDictType[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]:  # TODO: implement
338        return self._save_to_copy(25, db_keys)
def put( self, key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], value: typing.Union[typing.Any, cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.ValueInfo, NoneType] = None) -> bytes:
340    def put(self, key: InputKeyType, value: Optional[Union[ValueType, ValueInfo]] = None) -> RawValueType:
341        return self._save_to_copy(7, key, value)
def put_items( self, db_items: dict[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Union[typing.Any, cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.ValueInfo]]) -> Dict[Hashable, Dict[bytes, Tuple[bytes, Union[Exception, NoneType]]]]:
343    def put_items(self, db_items: Dict[InputKeyType, Union[ValueType, ValueInfo]]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]:
344        return self._save_to_copy(8, db_items)
def increment( self, key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], value: typing.Union[int, float] = 1) -> Union[int, float]:
346    def increment(self, key: InputKeyType, value: RationalNumber = 1) -> RationalNumber:  # TODO: implement
347        return self._save_to_copy(19, key, value)
def inc( self, key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], value: typing.Union[int, float] = 1) -> Union[int, float]:
346    def increment(self, key: InputKeyType, value: RationalNumber = 1) -> RationalNumber:  # TODO: implement
347        return self._save_to_copy(19, key, value)
def increment_items( self, db_items: dict[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Union[int, float, cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.ValueInfo]]) -> collections.OrderedDict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Union[int, float], typing.Union[Exception, NoneType]]]:
351    def increment_items(self, db_items: Dict[InputKeyType, Union[RationalNumber, ValueInfo]]) -> OrderedDictType[NormalizedKeyType, Tuple[RationalNumber, Optional[Exception]]]:  # TODO: implement
352        return self._save_to_copy(8, db_items)
def inc_items( self, db_items: dict[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Union[int, float, cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.ValueInfo]]) -> collections.OrderedDict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Union[int, float], typing.Union[Exception, NoneType]]]:
351    def increment_items(self, db_items: Dict[InputKeyType, Union[RationalNumber, ValueInfo]]) -> OrderedDictType[NormalizedKeyType, Tuple[RationalNumber, Optional[Exception]]]:  # TODO: implement
352        return self._save_to_copy(8, db_items)
def replace( self, key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], value: typing.Union[typing.Any, cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.ValueInfo, NoneType] = None) -> Tuple[bytes, Any]:
356    def replace(self, key: InputKeyType, value: Optional[Union[ValueType, ValueInfo]] = None) -> Tuple[RawValueType, ValueType]:  # TODO: implement
357        return self._save_to_copy(20, key, value)
def replace_items( self, db_items: dict[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Union[typing.Any, cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.ValueInfo]]) -> Dict[Hashable, Dict[bytes, Tuple[bytes, Any, Union[Exception, NoneType]]]]:
359    def replace_items(self, db_items: Dict[InputKeyType, Union[ValueType, ValueInfo]]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, ValueType, Optional[Exception]]]]:  # TODO: implement
360        return self._save_to_copy(21, db_items)
def delete( self, key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]) -> bytes:
362    def delete(self, key: InputKeyType) -> RawValueType:  # TODO: finish an implementation
363        return self._save_to_copy(9, key)
def delete_kv( self, key: typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], value: typing.Any) -> bytes:
365    def delete_kv(self, key: InputKeyType, value: ValueType) -> RawValueType:  # TODO: finish an implementation
366        return self._save_to_copy(10, key, value)
def delete_items( self, db_items: set[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]]) -> Dict[Hashable, Dict[bytes, Tuple[bytes, Union[Exception, NoneType]]]]:
368    def delete_items(self, db_items: Set[InputKeyType]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]:  # TODO: finish an implementation
369        return self._save_to_copy(11, db_items)
def delete_kv_items( self, db_items: dict[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Tuple[typing.Any]]) -> Dict[Hashable, Dict[bytes, Tuple[bytes, Union[Exception, NoneType]]]]:
371    def delete_kv_items(self, db_items: Dict[InputKeyType, Tuple[ValueType]]) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]:  # TODO: finish an implementation
372        return self._save_to_copy(12, db_items)
def execute_in_transaction( self, env_id: typing.Hashable, callable_or_coro: typing.Union[typing.Callable, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]]) -> None:
374    def execute_in_transaction(self, env_id: EnvId, callable_or_coro: Union[Callable, AnyWorker]) -> None:  # TODO: implement
375        # Will execute given callable or coroutine in transaction (after all other queues will be processed)
376        # It will run in current thread. So, it will block current thread until it will be finished if it is not a coroutine.
377        # On the other hand it will lock environment and all databases in it until coroutine will be finished.
378        return self._save_to_copy(29, env_id, callable_or_coro)
def open_db_environment( self, env_id: typing.Hashable, env_path: typing.Union[NoneType, str], can_be_written_externally: bool, *args, **kwargs) -> Hashable:
380    def open_db_environment(self, env_id: EnvId, env_path: Union[None, str], can_be_written_externally: bool, *args, **kwargs) -> EnvId:
381        return self._save_to_copy(13, env_id, env_path, can_be_written_externally, *args, **kwargs)
def close_db_environment(self, env_id: typing.Hashable) -> None:
383    def close_db_environment(self, env_id: EnvId) -> None:
384        return self._save_to_copy(18, env_id)
def lock_databases( self, db_names: typing.Union[typing.Set[typing.Hashable], NoneType] = None) -> None:
386    def lock_databases(self, db_names: Optional[Set[DbId]] = None) -> None:  # TODO: implement
387        # Lock all databases if db_names is None. Databases will be released automatically wnen coroutine execution will be finished
388        return self._save_to_copy(26, db_names)
def try_lock_databases( self, db_names: typing.Union[typing.Set[typing.Hashable], NoneType] = None) -> bool:
390    def try_lock_databases(self, db_names: Optional[Set[DbId]] = None) -> bool:  # TODO: implement
391        # Tries to lock all databases if db_names is None. Returns True if try was successfull. False otherwise. Databases will be released automatically wnen coroutine execution will be finished
392        return self._save_to_copy(27, db_names)
def unlock_databases( self, db_names: typing.Union[typing.Set[typing.Hashable], NoneType] = None) -> None:
394    def unlock_databases(self, db_names: Optional[Set[DbId]] = None) -> None:  # TODO: implement
395        # Unlock all databases if db_names is None
396        return self._save_to_copy(28, db_names)
default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] = <class 'Db'>
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
default__request__type__
request_type
args
kwargs
interface
i
async_interface
ai
KeyType = typing.Union[bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]
RawKeyType = <class 'bytes'>
ValueType = typing.Any
RawValueType = <class 'bytes'>
DbId = typing.Hashable
EnvId = typing.Hashable
DbName = <class 'bytes'>
class DbKeyError(builtins.KeyError):
185class DbKeyError(KeyError):
186    def __init__(self, key_info: Tuple[KeyType, DbId], *args: object) -> None:
187        super().__init__(*args)
188        self.key_info: Tuple[KeyType, DbId] = key_info

Mapping key not found.

DbKeyError( key_info: tuple[typing.Union[bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Hashable], *args: object)
186    def __init__(self, key_info: Tuple[KeyType, DbId], *args: object) -> None:
187        super().__init__(*args)
188        self.key_info: Tuple[KeyType, DbId] = key_info
key_info: tuple[typing.Union[bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Hashable]
Inherited Members
builtins.BaseException
with_traceback
args
class EnvInfo:
137class EnvInfo:
138    db_name_prefix = '__db_name_key_16318cf6_3e16_4881_a22f_328aa41c0d4f__'
139    db_name_prefix_bytes = b'__db_name_key_16318cf6_3e16_4881_a22f_328aa41c0d4f__'
140
141    def __init__(self, init_info: EnvInitInfo, env: Optional[lmdb.Environment] = None):
142        self.init_info: EnvInitInfo = init_info
143        if not os.path.exists(init_info.env_path) or (os.path.exists(init_info.env_path) and not os.path.isdir(init_info.env_path)):
144            os.makedirs(init_info.env_path)
145        
146        self.env: lmdb.Environment = lmdb.Environment(*init_info.args, **init_info.kwargs) if env is None else env
147        self.env_id: EnvId = init_info.env_id
148        self.databases: Dict[DbId, lmdb._Database] = dict()
149        self.db_names: Dict[DbId, DbName] = dict()
150        self.prepare_main_db()
151    
152    @staticmethod
153    def gen_db_name_from_db_id(db_id: DbId) -> Union[None, bytes]:
154        if db_id is None:
155            return None
156        
157        return f'{EnvInfo.db_name_prefix}{db_id}'.encode('utf-8')
158    
159    def db_name_by_db_id(self, db_id: DbId) -> Union[None, bytes]:
160        try:
161            return self.db_names[db_id]
162        except KeyError:
163            raise UnknownEnvDBError(self.env_id, db_id)
164    
165    def db_by_db_id(self, db_id: DbId) -> lmdb._Database:
166        try:
167            return self.databases[db_id]
168        except KeyError:
169            raise UnknownEnvDBError(self.env_id, db_id)
170    
171    def open_db(self, db_id: DbId, *args, **kwargs) -> lmdb._Database:
172            db_name: Union[None, bytes] = self.gen_db_name_from_db_id(db_id)
173            new_db: lmdb._Database = self.env.open_db(db_name, *args, **kwargs)
174            self.databases[db_id] = new_db
175            self.db_names[db_id] = db_name
176            return new_db
177    
178    def prepare_main_db(self) -> lmdb._Database:
179        self.open_db(None)
180    
181    def close(self):
182        self.env.close()
EnvInfo( init_info: cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.EnvInitInfo, env: typing.Union[Environment, NoneType] = None)
141    def __init__(self, init_info: EnvInitInfo, env: Optional[lmdb.Environment] = None):
142        self.init_info: EnvInitInfo = init_info
143        if not os.path.exists(init_info.env_path) or (os.path.exists(init_info.env_path) and not os.path.isdir(init_info.env_path)):
144            os.makedirs(init_info.env_path)
145        
146        self.env: lmdb.Environment = lmdb.Environment(*init_info.args, **init_info.kwargs) if env is None else env
147        self.env_id: EnvId = init_info.env_id
148        self.databases: Dict[DbId, lmdb._Database] = dict()
149        self.db_names: Dict[DbId, DbName] = dict()
150        self.prepare_main_db()
db_name_prefix = '__db_name_key_16318cf6_3e16_4881_a22f_328aa41c0d4f__'
db_name_prefix_bytes = b'__db_name_key_16318cf6_3e16_4881_a22f_328aa41c0d4f__'
init_info: cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_1.db.EnvInitInfo
env: Environment
env_id: Hashable
databases: Dict[Hashable, _Database]
db_names: Dict[Hashable, bytes]
@staticmethod
def gen_db_name_from_db_id(db_id: typing.Hashable) -> Union[NoneType, bytes]:
152    @staticmethod
153    def gen_db_name_from_db_id(db_id: DbId) -> Union[None, bytes]:
154        if db_id is None:
155            return None
156        
157        return f'{EnvInfo.db_name_prefix}{db_id}'.encode('utf-8')
def db_name_by_db_id(self, db_id: typing.Hashable) -> Union[NoneType, bytes]:
159    def db_name_by_db_id(self, db_id: DbId) -> Union[None, bytes]:
160        try:
161            return self.db_names[db_id]
162        except KeyError:
163            raise UnknownEnvDBError(self.env_id, db_id)
def db_by_db_id(self, db_id: typing.Hashable) -> _Database:
165    def db_by_db_id(self, db_id: DbId) -> lmdb._Database:
166        try:
167            return self.databases[db_id]
168        except KeyError:
169            raise UnknownEnvDBError(self.env_id, db_id)
def open_db(self, db_id: typing.Hashable, *args, **kwargs) -> _Database:
171    def open_db(self, db_id: DbId, *args, **kwargs) -> lmdb._Database:
172            db_name: Union[None, bytes] = self.gen_db_name_from_db_id(db_id)
173            new_db: lmdb._Database = self.env.open_db(db_name, *args, **kwargs)
174            self.databases[db_id] = new_db
175            self.db_names[db_id] = db_name
176            return new_db
def prepare_main_db(self) -> _Database:
178    def prepare_main_db(self) -> lmdb._Database:
179        self.open_db(None)
def close(self):
181    def close(self):
182        self.env.close()