cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db

   1#!/usr/bin/env python
   2# coding=utf-8
   3
   4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
   5# 
   6# Licensed under the Apache License, Version 2.0 (the "License");
   7# you may not use this file except in compliance with the License.
   8# You may obtain a copy of the License at
   9# 
  10#     http://www.apache.org/licenses/LICENSE-2.0
  11# 
  12# Unless required by applicable law or agreed to in writing, software
  13# distributed under the License is distributed on an "AS IS" BASIS,
  14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15# See the License for the specific language governing permissions and
  16# limitations under the License.
  17
  18
  19__all__ = ['Db', 'DbRequest', 'KeyType', 'RawKeyType', 'ValueType', 'RawValueType', 'DbId', 'DbName', 'DbKeyError']
  20
  21from cengal.parallel_execution.coroutines.coro_scheduler import *
  22from cengal.parallel_execution.coroutines.coro_tools.await_coro import *
  23from cengal.parallel_execution.coroutines.coro_standard_services.asyncio_loop import *
  24from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import CoroPriority
  25from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import *
  26from cengal.parallel_execution.coroutines.coro_standard_services.timer_func_runner import *
  27from cengal.parallel_execution.coroutines.coro_standard_services.instance import *
  28from cengal.file_system.file_manager import path_relative_to_current_dir
  29from cengal.time_management.cpu_clock_cycles import perf_counter
  30from cengal.data_manipulation.serialization import *
  31from cengal.introspection.inspect import get_exception
  32from cengal.file_system.app_fs_structure.app_dir_path import AppDirPath, AppDirectoryType
  33from cengal.file_system.path_manager import RelativePath
  34from typing import Hashable, Tuple, List, Any, Dict, Callable, Sequence
  35import sys
  36import os
  37import asyncio
  38try:
  39    import lmdb
  40except ImportError:
  41    from warnings import warn
  42    warn('''WARNING: `lmdb` library is not installed. Db service will not work.
  43         To install `lmdb` use: `pip install lmdb`''')
  44    raise
  45
  46from os.path import normpath
  47from uuid import uuid4
  48
  49
  50"""
  51Module Docstring
  52Docstrings: http://www.python.org/dev/peps/pep-0257/
  53"""
  54
  55__author__ = "ButenkoMS <gtalk@butenkoms.space>"
  56__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
  57__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
  58__license__ = "Apache License, Version 2.0"
  59__version__ = "4.4.1"
  60__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
  61__email__ = "gtalk@butenkoms.space"
  62# __status__ = "Prototype"
  63__status__ = "Development"
  64# __status__ = "Production"
  65
  66
  67SingleKeyType = Union[bytes, str]
  68CompoundKeyType = Union[List[SingleKeyType], Set[SingleKeyType], Tuple[SingleKeyType], List['CompoundKeyType'], Set['CompoundKeyType'], Tuple['CompoundKeyType']]
  69KeyType = Union[SingleKeyType, CompoundKeyType]
  70NormalizedCompoundKeyType = Union[Tuple[SingleKeyType], Tuple['NormalizedCompoundKeyType']]
  71NormalizedKeyType = Union[SingleKeyType, NormalizedCompoundKeyType]
  72RawKeyType = bytes  # By default record keys are limited to 511 bytes in length, however this can be adjusted by rebuilding the library. The compile-time key length can be queried via Environment.max_key_size()
  73ValueType = Any
  74RawValueType = bytes
  75DbId = Hashable
  76DbName = bytes
  77EnvId = Hashable
  78
  79
  80class KeyInfo:
  81    def __init__(self, key: KeyType, db_id: DbId = None, env_id: EnvId = None):
  82        self.key: KeyType = key
  83        self.db_id: DbId = db_id
  84        self.env_id: EnvId = env_id
  85
  86
  87class EnvInitInfo:
  88    def __init__(self, path_to_db_environment: str, *args, **kwargs):
  89        self.path_to_default_db_environment: str = normpath(path_to_db_environment)
  90        self.args: Tuple = (self.path_to_default_db_environment,) + args
  91        self.kwargs: Dict = kwargs
  92
  93
  94class EnvInfo:
  95    db_name_prefix = '__db_name_key__'
  96
  97    def __init__(self, init_info: EnvInitInfo, env: lmdb.Environment):
  98        self.init_info: EnvInitInfo = init_info
  99        self.env: lmdb.Environment = env
 100        self.env_id: EnvId = init_info.path_to_default_db_environment
 101        self.databases: Dict[Hashable, lmdb._Database] = dict()
 102        self.db_names: Dict[DbId, DbName] = dict()
 103    
 104    @staticmethod
 105    def gen_db_name_from_db_id(db_id: DbId) -> bytes:
 106        return f'{EnvInfo.db_name_prefix}{db_id}'.encode('utf-8')
 107    
 108    def db_name_by_db_id(self, db_id: DbId) -> bytes:
 109        db_name: bytes = EnvInfo.gen_db_name_from_db_id(db_id)
 110        try:
 111            return self.db_names[db_name]
 112        except KeyError:
 113            raise UnknownEnvDBError(self.env_id, db_id)
 114    
 115    def db_by_db_id(self, db_id: DbId) -> lmdb._Database:
 116        try:
 117            return self.databases[db_id]
 118        except KeyError:
 119            raise UnknownEnvDBError(self.env_id, db_id)
 120
 121
 122class DbKeyError(KeyError):
 123    def __init__(self, key_info: Tuple[KeyType, DbId], *args: object) -> None:
 124        super().__init__(*args)
 125        self.key_info: Tuple[KeyType, DbId] = key_info
 126
 127
 128class DBError(Exception):
 129    def __init__(self, db_id: DbId, original_exception: Exception, *args):
 130        super().__init__(*args)
 131        self.db_id: DbId = db_id
 132        self.original_exception = original_exception
 133    
 134    @staticmethod
 135    def from_exception(db_id: DbId) -> 'DBError':
 136        return DBError(db_id, get_exception())
 137
 138
 139class UnknownEnvError(Exception):
 140    pass
 141
 142
 143class UnknownEnvDBError(Exception):
 144    pass
 145
 146
 147class WrongKeyTypeError(TypeError):
 148    pass
 149
 150
 151class RawKeyIsTooLargeError(ValueError):
 152    pass
 153
 154
 155class NormalizedCompoundKey:
 156    def __init__(self, normalized_key: NormalizedKeyType) -> None:
 157        self.normalized_key: NormalizedKeyType = normalized_key
 158    
 159    def __call__(self) -> Any:
 160        return self.normalized_key
 161
 162    @staticmethod
 163    def from_key(key: KeyType):
 164        return NormalizedCompoundKey(normalize_compound_key(key))
 165
 166
 167NCK = NormalizedCompoundKey
 168InputKeyType = Union[NormalizedCompoundKey, KeyType]
 169
 170
 171def is_normalized_compound_key(key: KeyType) -> bool:
 172    if isinstance(key, (bytes, str, int, float)):
 173        return True
 174    elif isinstance(key, tuple):
 175        return all(is_normalized_compound_key(item) for item in key)
 176    else:
 177        return False
 178
 179
 180def normalize_compound_key(key: KeyType) -> NormalizedKeyType:
 181    if isinstance(key, NormalizedCompoundKey):
 182        return key()
 183    elif is_normalized_compound_key(key):
 184        return key
 185    elif isinstance(key, list):
 186        need_to_sort: bool = False
 187    elif isinstance(key, (set, frozenset)):
 188        need_to_sort = True
 189    elif isinstance(key, dict):
 190        key = tuple(key.items())
 191        need_to_sort = True
 192    else:
 193        raise WrongKeyTypeError(f'Wrong key type: {type(key)}: {key}')
 194
 195    new_key = list()
 196    for item in key:
 197        new_key.append(normalize_compound_key(item))
 198    
 199    if need_to_sort:
 200        new_key.sort()
 201    
 202    key = tuple(new_key)
 203
 204    return key
 205
 206
 207class DbRequest(ServiceRequest):
 208    def __init__(self, env_id: EnvId = None, db_id: DbId = None):
 209        super().__init__()
 210        self.env_id: EnvId = env_id
 211        self.db_id: DbId = db_id
 212        self.provide_to_request_handler = True
 213    
 214    def _copy(self) -> 'DbRequest':
 215        return DbRequest(self.env_id, self.db_id)
 216    
 217    def _get_db_id(self, db_id: DbId) -> DbId:
 218        if self.db_id is None:
 219            return db_id
 220        else:
 221            return self.db_id
 222    
 223    def set_default_db_environment_path(self, path_to_db_environment: str) -> bool:
 224        return self._save_to_copy(0, path_to_db_environment)
 225    
 226    def open_databases(self, db_names: Dict[DbId, DbName]) -> None:
 227        return self._save_to_copy(1, db_names)
 228    
 229    def drop_db(self, db_id: DbId, delete: bool = False) -> None:
 230        return self._save_to_copy(2, db_id, delete)
 231    
 232    def sync(self) -> None:
 233        return self._save_to_copy(3)
 234    
 235    def get(self, key: InputKeyType, db_id: DbId = None) -> ValueType:
 236        return self._save_to_copy(4, key, db_id)
 237    
 238    def get_first(self, db_id: DbId = None) -> Dict[NormalizedKeyType, ValueType]:
 239        # Returns first item in DB
 240        return self._save_to_copy(14, db_id)
 241    
 242    def get_last(self, db_id: DbId = None) -> Dict[NormalizedKeyType, ValueType]:
 243        # Returns last item in DB
 244        return self._save_to_copy(15, db_id)
 245    
 246    def get_items(self, db_keys: Sequence[InputKeyType], db_id: DbId = None) -> List[Tuple[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]]:
 247        return self._save_to_copy(5, db_keys, db_id)
 248    
 249    def get_n_items(self, desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]:
 250        return self._save_to_copy(16, desired_key, num, db_id, reverse=False)
 251    
 252    def get_reverse_n_items(self, desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]:
 253        return self._save_to_copy(16, desired_key, num, db_id, reverse=True)
 254    
 255    def get_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]:
 256        return self._save_to_copy(17, first_desired_key, last_desired_key, num, db_id, reverse=False)
 257    
 258    def get_reverse_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]:
 259        return self._save_to_copy(17, first_desired_key, last_desired_key, num, db_id, reverse=True)
 260    
 261    def get_all_items(self, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]:
 262        # Returns all items from DB
 263        return self._save_to_copy(6, db_id)
 264    
 265    def put(self, key: InputKeyType, value: Optional[ValueType] = None, db_id: DbId = None) -> RawValueType:
 266        return self._save_to_copy(7, key, value, db_id)
 267    
 268    def put_items(self, db_items: Dict[InputKeyType, ValueType], db_id: DbId = None) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]:
 269        return self._save_to_copy(8, db_items, db_id)
 270    
 271    def delete(self, key: InputKeyType, db_id: DbId = None) -> RawValueType:
 272        return self._save_to_copy(9, key, db_id)
 273    
 274    def delete_kv(self, key: InputKeyType, value: ValueType, db_id: DbId = None) -> RawValueType:
 275        return self._save_to_copy(10, key, value, db_id)
 276    
 277    def delete_items(self, db_items: Set[InputKeyType], db_id: DbId = None) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]:
 278        return self._save_to_copy(11, db_items, db_id)
 279    
 280    def delete_kv_items(self, db_items: Dict[InputKeyType, Tuple[ValueType]], db_id: DbId = None) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]:
 281        return self._save_to_copy(12, db_items, db_id)
 282
 283    def open_db_environment(self, path_to_db_environment: str) -> EnvId:
 284        return self._save_to_copy(13, path_to_db_environment)
 285    
 286    def lock_databases(self, db_names: Optional[Set[DbId]] = None) -> None:
 287        # Lock all databases if db_names is None. Databases will be released automatically wnen coroutine execution will be finished
 288        return self._save_to_copy(18, db_names)
 289    
 290    def try_lock_databases(self, db_names: Optional[Set[DbId]] = None) -> bool:
 291        # Tries to lock all databases if db_names is None. Returns True if try was successfull. False otherwise. Databases will be released automatically wnen coroutine execution will be finished
 292        return self._save_to_copy(19, db_names)
 293    
 294    def unlock_databases(self, db_names: Optional[Set[DbId]] = None) -> None:
 295        # Unlock all databases if db_names is None
 296        return self._save_to_copy(18, db_names)
 297    
 298
 299class Db(Service, EntityStatsMixin):
 300    def __init__(self, loop: CoroSchedulerType):
 301        super(Db, self).__init__(loop)
 302        self.default_db_id: DbName = b'__default__'
 303        self.default_env_name: str = '__default__.dbenv'
 304        # self.drop_db_requests: Dict[CoroID, Tuple[Hashable, bool]] = dict()
 305        # self.drop_db_requests: Dict[EnvId, Dict[CoroID, Tuple[DbId, bool]]] = dict()
 306        self.drop_db_requests: Dict[EnvId, Dict[DbId, List[bool, Set[CoroID]]]] = dict()
 307        # self.read_queue: List[Tuple[CoroID, Tuple[RawKeyType, DbId]]] = list()
 308        self.read_queue: Dict[EnvId, Dict[DbId, Dict[RawKeyType, Set[CoroID]]]] = dict()
 309        self.massive_read_queue: Dict[EnvId, Dict[CoroID, Dict[DbId, Set[RawKeyType]]]] = dict()
 310        # self.massive_read_queue: List[Tuple[CoroID, Set[Tuple[KeyType, DbId]]]] = list()
 311        self.data_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, RawValueType]]] = dict()
 312        # self.data_cache: Dict[EnvId, Dict[Tuple[RawKeyType, DbId], RawValueType]] = dict()
 313        self.deletion_cache: Dict[EnvId, Dict[DbId, Set[RawKeyType]]] = dict()
 314        self.kv_deletion_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, RawValueType]]] = dict()
 315        # self.kv_deletion_cache: Dict[Tuple[Hashable, Hashable], Any] = dict()
 316        self.get_first_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict()
 317        self.get_last_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict()
 318        self.get_n_items_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, int, bool]]] = dict()
 319        self.get_items_range_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, RawKeyType, int, bool]]] = dict()
 320        self.get_all_items_queue: List[Tuple[CoroID, DbId, EnvId]] = list()
 321        self.path_to_default_db_environment: str = None
 322        self.app_name_waiter: CoroWrapperBase = None
 323        self.default_db_environment: lmdb.Environment = None
 324        self.db_environments: Dict[EnvId, EnvInfo] = dict()
 325        # self.databases: Dict[Hashable, Any] = dict()
 326        # self.db_names: Dict[DbId, DbName] = dict()
 327        self.async_loop = None
 328        self.sync_time_interval = 1.0
 329        self.last_sync_time = perf_counter()
 330        self.force_sync: Set[EnvId] = set()
 331        self.write_locked: Set[EnvId] = set()
 332        self.writes_num: int = 0
 333        self.reads_num: int = 0
 334        self.deletes_num: int = 0
 335        self.db_drops_num: int = 0
 336        self.write_locked_coro_id: Set[CoroID] = set()
 337        # self.serializer = best_serializer_for_standard_data((DataFormats.binary,
 338        #                                    Tags.can_use_bytes,
 339        #                                    Tags.decode_str_as_str,
 340        #                                    Tags.decode_list_as_list,
 341        #                                    Tags.decode_bytes_as_bytes,
 342        #                                    Tags.superficial,
 343        #                                    Tags.current_platform,
 344        #                                    Tags.multi_platform),
 345        #                                   TestDataType.small,
 346        #                                   0.1)
 347        self.serializer = Serializer(Serializers.msgspec_messagepack)
 348
 349        self._request_workers = {
 350            0: self._on_set_default_db_environment_path,
 351            1: self._on_open_databases,
 352            2: self._on_drop_db,
 353            3: self._on_sync,
 354            4: self._on_get,
 355            5: self._on_get_items,
 356            6: self._on_get_all_items,
 357            7: self._on_put,
 358            8: self._on_put_items,
 359            9: self._on_delete,
 360            10: self._on_delete_kv,
 361            11: self._on_delete_items,
 362            12: self._on_delete_kv_items,
 363            13: self._on_open_db_environment,
 364            14: self._on_get_first,
 365            15: self._on_get_last,
 366            16: self._on_get_n_items,
 367            17: self._on_get_items_range,
 368        }
 369
 370    # TODO: sync with last implementation
 371    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
 372        return type(self).__name__, {
 373            'db_names': list(self.db_names.keys()),
 374            'writes num': self.writes_num,
 375            'reads num': self.reads_num,
 376            'deletes num': self.deletes_num,
 377            'db drops num': self.db_drops_num,
 378        }
 379    
 380    def norm_key(self, key: InputKeyType) -> NormalizedKeyType:
 381        return normalize_compound_key(key)
 382
 383    def raw_key(self, env_or_id: Union[lmdb.Environment, EnvId], key: InputKeyType) -> RawKeyType:
 384        raw_key: bytes = self.serializer.dumps(normalize_compound_key(key))
 385        if isinstance(env_or_id, lmdb.Environment):
 386            env = env_or_id
 387        else:
 388            env = self.db_environments[env_or_id].env
 389        
 390        if len(raw_key) > env.max_key_size():
 391            raise RawKeyIsTooLargeError(f'Raw form ({raw_key=}) of the key ({key=}) is too large: {len(raw_key)} > {env.max_key_size()}')
 392
 393    def single_task_registration_or_immediate_processing(
 394            self, *args, **kwargs) -> ServiceProcessingResponse:
 395        result = self.try_resolve_request(*args, **kwargs)
 396        if result is None:
 397            return True, None, None
 398        else:
 399            return result
 400
 401    def full_processing_iteration(self):
 402        if self.default_db_environment is None:
 403            if self.path_to_default_db_environment is None:
 404                if self.app_name_waiter is None:
 405                    async def coro(i: Interface, self: 'Db'):
 406                        app_name_for_fs = await i(InstanceRequest().wait('app_name_for_fs'))
 407                        app_data_dir_path_type: AppDirectoryType = await i(InstanceRequest().wait('app_data_dir_path_type'))
 408                        app_dir_path: AppDirPath = await i(InstanceRequest().wait(AppDirPath))
 409                        app_data_dir_path: str = app_dir_path.cached(app_data_dir_path_type, app_name_for_fs)
 410                        self.path_to_default_db_environment = RelativePath(app_data_dir_path)(self.default_env_name)
 411                        self.app_name_waiter = None
 412                        self.make_live()
 413                    
 414                    self.app_name_waiter = put_root_from_other_service(self, coro, self)
 415                    self.app_name_waiter.is_background_coro = True
 416                
 417                self.make_dead()
 418                return
 419            else:
 420                self._init_default_db()
 421        
 422        if self.force_sync:
 423            envs_need_to_be_sync: Set[DbId] = self.force_sync
 424            self.force_sync = set()
 425        else:
 426            envs_need_to_be_sync = set()
 427        
 428        data_cache_buff = self.data_cache
 429        self.data_cache = type(data_cache_buff)()
 430        
 431        read_queue_buff = self.read_queue
 432        self.read_queue = type(read_queue_buff)()
 433        
 434        massive_read_queue_buff = self.massive_read_queue
 435        self.massive_read_queue = type(massive_read_queue_buff)()
 436        
 437        kv_deletion_cache_buff = self.kv_deletion_cache
 438        self.kv_deletion_cache = type(kv_deletion_cache_buff)()
 439        
 440        get_all_items_queue_buff = self.get_all_items_queue
 441        self.get_all_items_queue = type(get_all_items_queue_buff)()
 442
 443        get_first_queue_buff = self.get_first_queue
 444        self.get_first_queue = type(get_first_queue_buff)()
 445
 446        get_last_queue_buff = self.get_last_queue
 447        self.get_last_queue = type(get_last_queue_buff)()
 448
 449        get_n_items_queue_buff = self.get_n_items_queue
 450        self.get_n_items_queue = type(get_n_items_queue_buff)()
 451
 452        get_items_range_queue_buff = self.get_items_range_queue
 453        self.get_items_range_queue = type(get_items_range_queue_buff)()
 454
 455        # put
 456        def put_handler(env_info: EnvInfo, put_info: Dict[DbId, Dict[RawKeyType, RawValueType]]):
 457            try:
 458                with env_info.env.begin(write=True) as txn:
 459                    for db_id, db_put_info in put_info.items():
 460                        if db_id in env_info.databases:
 461                            for raw_key, value in db_put_info.items():
 462                                txn.put(raw_key, value, db=env_info.databases[db_id], dupdata=False, append=False)
 463                        
 464                        self.writes_num += len(db_put_info)
 465            except lmdb.MapFullError:
 466                raise DBError.from_exception(db_id)
 467        
 468        for env_id, put_info in data_cache_buff.items():
 469            if env_id in self.db_environments:
 470                envs_need_to_be_sync.add(env_id)
 471                lmdb_reapplier(self.db_environments[env_id], put_handler, put_info)
 472
 473        # TODO: implement delete* methods processing
 474        # delete
 475        for env_id, kv_deletion_cache_buff_db_info in kv_deletion_cache_buff.items():
 476            if env_id in self.db_environments:
 477                envs_need_to_be_sync.add(env_id)
 478                ...
 479
 480        for key_info, value in kv_deletion_cache_buff.items():
 481            with self.default_db_environment.begin(write=True) as txn:
 482                key, db_id = key_info
 483                txn.delete(key, value, db=self.databases[db_id])
 484                self.deletes_num += 1
 485
 486        # drop
 487        drop_db_requests_buff = self.drop_db_requests
 488        self.drop_db_requests = type(drop_db_requests_buff)()
 489        
 490        def drop_handler(env_info: EnvInfo, drop_info: Dict[DbId, List[bool, Set[CoroID]]]):
 491            for db_id, db_drop_info in drop_info.items():
 492                delete_db, coro_id = db_drop_info
 493                if db_id in env_info.databases:
 494                    try:
 495                        with env_info.env.begin(write=True) as txn:
 496                            txn.drop(db=env_info.databases[db_id], delete=delete_db)
 497                            if delete_db:
 498                                del env_info.databases[db_id]
 499                                del env_info.db_names[db_id]
 500                        
 501                        self.db_drops_num += 1
 502                    except lmdb.MapFullError:
 503                        raise DBError.from_exception(db_id)
 504                    
 505                    self.register_response(coro_id, None, UnknownEnvError(env_id))
 506                else:
 507                    self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 508        
 509        for env_id, drop_info in drop_db_requests_buff.items():
 510            if env_id in self.db_environments:
 511                envs_need_to_be_sync.add(env_id)
 512                lmdb_reapplier(self.db_environments[env_id], drop_handler, drop_info)
 513            else:
 514                for db_id, db_drop_info in drop_info.items():
 515                    delete_db, coro_id = db_drop_info
 516                    self.register_response(coro_id, None, UnknownEnvError(env_id))
 517
 518        # get
 519        def get_item(txn, key_info: Tuple[RawKeyType, DbId, EnvId], data_cache_buff: Dict[EnvId, Dict[DbId, Dict[RawKeyType, RawValueType]]]) -> Tuple[ValueType, Optional[Exception]]:
 520            key, db_id, env_id = key_info
 521            need_to_get_from_db = True
 522            try:
 523                value = data_cache_buff[env_id][db_id][key]
 524                need_to_get_from_db = False
 525            except KeyError:
 526                pass
 527            
 528            if need_to_get_from_db:
 529                value = txn.get(key, db=self.db_environments[env_id].databases[db_id])
 530                self.reads_num += 1
 531            
 532            exception = None
 533            try:
 534                if value is None:
 535                    exception = DbKeyError(key_info)
 536                else:
 537                    value = self.serializer.loads(value)
 538            except:
 539                exception = get_exception()
 540            
 541            return value, exception
 542        
 543        # _on_get
 544        for env_id, read_queue_buff_db_info in read_queue_buff.items():
 545            if env_id in self.db_environments:
 546                env_info = self.db_environments[env_id]
 547                for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items():
 548                    if db_id in env_info.databases:
 549                        with env_info.env.begin() as txn:
 550                            for raw_key, coro_ids in read_queue_buff_db_key_info.items():
 551                                value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff)
 552                                for coro_id in coro_ids:
 553                                    self.register_response(coro_id, value, exception)
 554                    else:
 555                        for raw_key, coro_ids in read_queue_buff_db_key_info.items():
 556                            for coro_id in coro_ids:
 557                                self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 558            else:
 559                for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items():
 560                    for raw_key, coro_ids in read_queue_buff_db_key_info.items():
 561                        for coro_id in coro_ids:
 562                            self.register_response(coro_id, None, UnknownEnvError(env_id))
 563        
 564        # _on_get_items
 565        results: Dict[CoroID, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]]] = dict()
 566        for env_id, massive_read_queue_buff_coro_info in massive_read_queue_buff.items():
 567            if env_id in self.db_environments:
 568                env_info = self.db_environments[env_id]
 569                for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items():
 570                    if coro_id not in results:
 571                        results[coro_id] = dict()
 572                    
 573                    coro_results = results[coro_id]
 574                    for db_id, raw_keys in read_queue_buff_db_info.items():
 575                        if db_id not in coro_results:
 576                            coro_results[db_id] = dict()
 577                        
 578                        coro_db_results = coro_results[db_id]
 579                        if db_id in env_info.databases:
 580                            with env_info.env.begin() as txn:
 581                                for raw_key in raw_keys:
 582                                    value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff)
 583                                    coro_db_results[normalize_compound_key(raw_key)] = (value, exception)
 584                        else:
 585                            for coro_id in coro_ids:
 586                                self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 587            else:
 588                for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items():
 589                    for db_id, raw_keys in read_queue_buff_db_info.items():
 590                        for coro_id in coro_ids:
 591                            self.register_response(coro_id, None, UnknownEnvError(env_id))
 592        
 593        for coro_id, coro_results in results.items():
 594            self.register_response(coro_id, coro_results, None)
 595
 596        # get all items
 597        for coro_id, db_id, env_id in get_all_items_queue_buff:
 598            if env_id in self.db_environments:
 599                env_info = self.db_environments[env_id]
 600                env = env_info.env
 601                if db_id in env_info.databases:
 602                    db = env_info.databases[db_id]
 603                    with env.begin(db=db) as txn:
 604                        result = list()
 605                        exception = None
 606                        try:
 607                            result = [(normalize_compound_key(self.serializer.loads(k)), self.serializer.loads(v)) for k, v in txn.cursor()]
 608                            self.reads_num += len(result)
 609                        except:
 610                            exception = get_exception()
 611                        
 612                        self.register_response(coro_id, result, exception)
 613                else:
 614                    self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 615            else:
 616                self.register_response(coro_id, None, UnknownEnvError(env_id))
 617        
 618        # get_first
 619        for env_id, get_first_queue_buff_db_info in get_first_queue_buff.items():
 620            if env_id in self.db_environments:
 621                env_info = self.db_environments[env_id]
 622                for db_id, coro_ids in get_first_queue_buff_db_info.items():
 623                    if db_id in env_info.databases:
 624                        db = env_info.databases[db_id]
 625                        with env_info.env.begin(db=db) as txn:
 626                            result = None
 627                            exception = None
 628                            try:
 629                                cursor: lmdb.Cursor = txn.cursor()
 630                                if cursor.first():
 631                                    result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value()))
 632                                    self.reads_num += 1
 633                                else:
 634                                    exception = KeyError()
 635                            except:
 636                                exception = get_exception()
 637                            
 638                            for coro_id in coro_ids:
 639                                self.register_response(coro_id, result, exception)
 640                    else:
 641                        for coro_id in coro_ids:
 642                            self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 643            else:
 644                for db_id, coro_ids in get_first_queue_buff_db_info.items():
 645                    for coro_id in coro_ids:
 646                        self.register_response(coro_id, None, UnknownEnvError(env_id))
 647        
 648        # get_last
 649        for env_id, get_last_queue_buff_db_info in get_last_queue_buff.items():
 650            if env_id in self.db_environments:
 651                env_info = self.db_environments[env_id]
 652                for db_id, coro_ids in get_last_queue_buff_db_info.items():
 653                    if db_id in env_info.databases:
 654                        db = env_info.databases[db_id]
 655                        with env_info.env.begin(db=db) as txn:
 656                            result = None
 657                            exception = None
 658                            try:
 659                                cursor: lmdb.Cursor = txn.cursor()
 660                                if cursor.first():
 661                                    result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value()))
 662                                    self.reads_num += 1
 663                                else:
 664                                    exception = KeyError()
 665                            except:
 666                                exception = get_exception()
 667                            
 668                            for coro_id in coro_ids:
 669                                self.register_response(coro_id, result, exception)
 670                    else:
 671                        for coro_id in coro_ids:
 672                            self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 673            else:
 674                for db_id, coro_ids in get_last_queue_buff_db_info.items():
 675                    for coro_id in coro_ids:
 676                        self.register_response(coro_id, None, UnknownEnvError(env_id))
 677        
 678        # get_n_items
 679        for env_id, get_n_items_queue_buff_coro_info in get_n_items_queue_buff.items():
 680            if env_id in self.db_environments:
 681                env_info = self.db_environments[env_id]
 682                for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items():
 683                    coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list()
 684                    db_id, first_desired_raw_key, num_items, reverse = read_queue_buff_db_info
 685                    if db_id in env_info.databases:
 686                        db = env_info.databases[db_id]
 687                        with env_info.env.begin(db=db) as txn:
 688                            result = None
 689                            exception = None
 690                            try:
 691                                cursor: lmdb.Cursor = txn.cursor()
 692                                if cursor.set_range(first_desired_raw_key):
 693                                    if reverse:
 694                                        cursor_iterator = cursor.iterprev(keys=True, values=True)
 695                                    else:
 696                                        cursor_iterator = cursor.iternext(keys=True, values=True)
 697                                    
 698                                    for raw_key, raw_value in cursor_iterator:
 699                                        if (num_items is not None) and (num_items <= 0):
 700                                            break
 701                                        
 702                                        coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value)))
 703                                        self.reads_num += 1
 704                                        if num_items is not None:
 705                                            num_items -= 1
 706                                else:
 707                                    exception = KeyError()
 708                            except:
 709                                exception = get_exception()
 710                            
 711                            self.register_response(coro_id, result, exception)
 712                    else:
 713                        self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 714            else:
 715                for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items():
 716                    for coro_id in coro_ids:
 717                        self.register_response(coro_id, None, UnknownEnvError(env_id))
 718        
 719        # get_items_range
 720        for env_id, get_items_range_queue_buff_coro_info in get_items_range_queue_buff.items():
 721            if env_id in self.db_environments:
 722                env_info = self.db_environments[env_id]
 723                for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items():
 724                    coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list()
 725                    db_id, first_desired_raw_key, last_desired_raw_key, num_items, reverse = read_queue_buff_db_info
 726                    if db_id in env_info.databases:
 727                        db = env_info.databases[db_id]
 728                        with env_info.env.begin(db=db) as txn:
 729                            result = None
 730                            exception = None
 731                            try:
 732                                cursor: lmdb.Cursor = txn.cursor()
 733                                if cursor.set_range(first_desired_raw_key):
 734                                    if reverse:
 735                                        cursor_iterator = cursor.iterprev(keys=True, values=True)
 736                                    else:
 737                                        cursor_iterator = cursor.iternext(keys=True, values=True)
 738                                    
 739                                    for raw_key, raw_value in cursor_iterator:
 740                                        if reverse:
 741                                            if raw_key < last_desired_raw_key:
 742                                                break
 743                                        else:
 744                                            if raw_key > last_desired_raw_key:
 745                                                break
 746
 747                                        if (num_items is not None) and (num_items <= 0):
 748                                            break
 749
 750                                        coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value)))
 751                                        self.reads_num += 1
 752                                        if num_items is not None:
 753                                            num_items -= 1
 754                                else:
 755                                    exception = KeyError()
 756                            except:
 757                                exception = get_exception()
 758                            
 759                            self.register_response(coro_id, result, exception)
 760                    else:
 761                        self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 762            else:
 763                for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items():
 764                    for coro_id in coro_ids:
 765                        self.register_response(coro_id, None, UnknownEnvError(env_id))
 766        
 767        # sync
 768        if envs_need_to_be_sync:
 769            for env_id in envs_need_to_be_sync:
 770                self.sync_in_thread_pool(env_id)
 771            
 772            self.last_sync_time = perf_counter()
 773
 774        self.make_dead()
 775
 776    def in_work(self) -> bool:
 777        result: bool = (bool(self.get_first_queue) or bool(self.get_last_queue) or bool(self.get_n_items_queue) or bool(self.get_items_range_queue) or bool(self.read_queue) or bool(self.massive_read_queue) or bool(self.get_all_items_queue)) or (bool(self.force_sync) or ((not bool(self.write_locked)) and (bool(self.data_cache) or bool(self.kv_deletion_cache) or bool(self.drop_db_requests)) and ((perf_counter() - self.last_sync_time) >= self.sync_time_interval)))
 778        return self.thrifty_in_work(result)
 779    
 780    def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]:
 781        time_since_last_sync_time: float = perf_counter() - self.last_sync_time
 782        if self.sync_time_interval > time_since_last_sync_time:
 783            return True, self.sync_time_interval - time_since_last_sync_time
 784        else:
 785            return True, 0
 786
 787    def _init_db(self, db_id: DbId):
 788        # print(f'{self.path_to_default_db_environment=}')
 789        if db_id in self.db_environments:
 790            return self.db_environments[db_id]
 791
 792        env_init_info: EnvInitInfo = EnvInitInfo(
 793            db_id, 
 794            map_size=20 * 1024**2, 
 795            writemap=True, 
 796            max_dbs=10,
 797            map_async=True, 
 798            lock=False, 
 799            metasync=False, 
 800            sync=False, 
 801            meminit=False,
 802        )
 803        env_info: EnvInfo = EnvInfo(env_init_info, lmdb.open(*env_init_info.args, **env_init_info.kwargs))
 804        db_environment = env_info.env
 805        self.db_environments[env_info.env_id] = env_info
 806        env_info.databases[self.default_db_id] = env_info.databases[None] = db_environment.open_db(self.default_db_id)
 807        db_environment.sync(True)
 808
 809    def _init_default_db(self):
 810        env_info: EnvInfo = self._init_db(self.path_to_default_db_environment)
 811        self.db_environments[None] = env_info
 812
 813    def _on_open_db_environment(self, request: DbRequest, path_to_db_environment: str) -> ServiceProcessingResponse:
 814        result: EnvInfo = None
 815        try:
 816            result = self._init_db(path_to_db_environment)
 817        except:
 818            exception = get_exception()
 819            return True, result, exception
 820        
 821        return True, result.env_id, None
 822
 823    def _on_set_default_db_environment_path(self, request: DbRequest, path_to_db_environment: str) -> ServiceProcessingResponse:
 824        if {None, path_to_db_environment} & self.write_locked:
 825            return True, False, None
 826        
 827        if self.default_db_environment is None:
 828            self.path_to_default_db_environment = path_to_db_environment
 829            try:
 830                self._init_default_db()
 831            except:
 832                exception = get_exception()
 833                return True, False, exception
 834            return True, True, None
 835        else:
 836            return True, False, None
 837    
 838    def _on_sync(self, request: DbRequest) -> ServiceProcessingResponse:
 839        if self.data_cache:
 840            self.force_sync.add(request.env_id)
 841            self.make_live()
 842        else:
 843            # self.default_db_environment.sync(True)
 844            self.sync_in_thread_pool(request.env_id)
 845        
 846        return True, None, None
 847    
 848    def _on_get(self, request: DbRequest, key: KeyType, db_id: DbId = None) -> ServiceProcessingResponse:
 849        coro_id = self.current_caller_coro_info.coro_id
 850        key = self.serializer.dumps(normalize_compound_key(key))
 851        env_id = request.env_id
 852        db_id = request._get_db_id(db_id)
 853        
 854        if env_id in self.data_cache:
 855            env_cache = self.data_cache[env_id]
 856            if db_id in env_cache:
 857                db_cache = env_cache[db_id]
 858                if key in db_cache:
 859                    return True, db_cache[key], None
 860        
 861        if env_id not in self.read_queue:
 862            self.read_queue[env_id] = dict()
 863        
 864        read_queue_env = self.read_queue[env_id]
 865        if db_id not in read_queue_env:
 866            read_queue_env[db_id] = dict()
 867        
 868        read_queue_env_db = read_queue_env[db_id]
 869        if key not in read_queue_env_db:
 870            read_queue_env_db[key] = set()
 871        
 872        read_queue_env_db_key = read_queue_env_db[key]
 873        read_queue_env_db_key.add(coro_id)
 874        self.make_live()
 875        return False, None, None
 876    
 877    def _on_get_first(self, request: DbRequest, db_id: DbId = None) -> ServiceProcessingResponse:
 878        coro_id = self.current_caller_coro_info.coro_id
 879        env_id = request.env_id
 880        db_id = request._get_db_id(db_id)
 881        
 882        if env_id not in self.get_first_queue:
 883            self.get_first_queue[env_id] = dict()
 884        
 885        get_first_queue_env = self.get_first_queue[env_id]
 886        if db_id not in get_first_queue_env:
 887            get_first_queue_env[db_id] = set()
 888        
 889        get_first_queue_env_db = get_first_queue_env[db_id]
 890        get_first_queue_env_db.add(coro_id)
 891        
 892        self.make_live()
 893        return False, None, None
 894    
 895    def _on_get_last(self, request: DbRequest, db_id: DbId = None) -> ServiceProcessingResponse:
 896        coro_id = self.current_caller_coro_info.coro_id
 897        env_id = request.env_id
 898        db_id = request._get_db_id(db_id)
 899        
 900        if env_id not in self.get_last_queue:
 901            self.get_last_queue[env_id] = dict()
 902        
 903        get_last_queue_env = self.get_last_queue[env_id]
 904        if db_id not in get_last_queue_env:
 905            get_last_queue_env[db_id] = set()
 906        
 907        get_last_queue_env_db = get_last_queue_env[db_id]
 908        get_last_queue_env_db.add(coro_id)
 909        
 910        self.make_live()
 911        return False, None, None
 912    
 913    def _on_get_n_items(self, request: DbRequest, desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None, reverse: bool = False) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]:
 914        coro_id = self.current_caller_coro_info.coro_id
 915        desired_key = self.serializer.dumps(normalize_compound_key(desired_key))
 916        db_id = request._get_db_id(db_id)
 917        env_id = request.env_id
 918
 919        if env_id not in self.get_n_items_queue:
 920            self.get_n_items_queue[env_id] = dict()
 921        
 922        get_n_items_queue_env = self.get_n_items_queue[env_id]
 923        get_n_items_queue_env[coro_id] = (db_id, desired_key, num, reverse)
 924        self.make_live()
 925        return False, None, None
 926    
 927    def _on_get_items_range(self, request: DbRequest, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None, reverse: bool = False) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]:
 928        coro_id = self.current_caller_coro_info.coro_id
 929        first_desired_key = self.serializer.dumps(normalize_compound_key(first_desired_key))
 930        last_desired_key = self.serializer.dumps(normalize_compound_key(last_desired_key))
 931        db_id = request._get_db_id(db_id)
 932        env_id = request.env_id
 933
 934        if env_id not in self.get_items_range_queue:
 935            self.get_items_range_queue[env_id] = dict()
 936        
 937        get_items_range_queue_env = self.get_items_range_queue[env_id]
 938        get_items_range_queue_env[coro_id] = (db_id, first_desired_key, last_desired_key, num, reverse)
 939        self.make_live()
 940        return False, None, None
 941    
 942    def _on_get_items(self, request: DbRequest, db_keys: Union[Set[InputKeyType], Dict[DbId, Set[InputKeyType]]]) -> Tuple[bool, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]], Optional[Exception]]:
 943        coro_id = self.current_caller_coro_info.coro_id
 944        env_id = request.env_id
 945
 946        if env_id not in self.massive_read_queue:
 947            self.massive_read_queue[env_id] = dict()
 948        
 949        massive_read_queue_env = self.massive_read_queue[env_id]
 950        if coro_id not in massive_read_queue_env:
 951            massive_read_queue_env[coro_id] = dict()
 952        
 953        massive_read_queue_env_coro = massive_read_queue_env[coro_id]
 954        if request.db_id is not None:
 955            db_keys = {request.db_id: db_keys}
 956        
 957        for db_id, keys in db_keys.items():
 958            if db_id not in massive_read_queue_env_coro:
 959                massive_read_queue_env_coro[db_id] = set()
 960            
 961            massive_read_queue_env_coro_db = massive_read_queue_env_coro[db_id]
 962            for key in keys:
 963                massive_read_queue_env_coro_db.add(self.serializer.dumps(normalize_compound_key(key)))
 964
 965        self.make_live()
 966        return False, None, None
 967    
 968    def _on_get_all_items(self, request: DbRequest, db_id: DbId) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]:
 969        db_id = request._get_db_id(db_id)
 970        coro_id = self.current_caller_coro_info.coro_id
 971        self.get_all_items_queue.append((coro_id, db_id, request.env_id))
 972        self.make_live()
 973        return False, None, None
 974    
 975    def _on_put(self, request: DbRequest, key: KeyType, value: Any, db_id: DbId = None) -> Tuple[bool, RawValueType, Optional[Exception]]:
 976        key = self.serializer.dumps(normalize_compound_key(key))
 977        env_id = request.env_id
 978        db_id = request._get_db_id(db_id)
 979        
 980        exception = None
 981        result = None
 982        try:
 983            if env_id not in self.data_cache:
 984                self.data_cache[env_id] = dict()
 985            
 986            env_data_cache = self.data_cache[env_id]
 987            if db_id not in env_data_cache:
 988                env_data_cache[db_id] = dict()
 989            
 990            env_data_cache_db = env_data_cache[db_id]
 991            result = env_data_cache_db[key] = self.serializer.dumps(value)
 992        except:
 993            exception = get_exception()
 994        
 995        self.make_live()
 996        return True, result, exception
 997    
 998    def _on_put_items(self, request: DbRequest, db_items: Dict[InputKeyType, ValueType], db_id: DbId = None) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]], Optional[Exception]]:
 999        result_items: Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]] = dict()
1000        env_id = request.env_id
1001        db_id = request._get_db_id(db_id)
1002
1003        if env_id not in self.data_cache:
1004            self.data_cache[env_id] = dict()
1005        
1006        env_data_cache = self.data_cache[env_id]
1007        if db_id not in result_items:
1008            result_items[db_id] = dict()
1009        
1010        result_db_items = result_items[db_id]
1011        
1012        if db_id not in env_data_cache:
1013            env_data_cache[db_id] = dict()
1014        
1015        env_data_cache_db = env_data_cache[db_id]
1016        for key, value in db_items.items():
1017            key = self.serializer.dumps(normalize_compound_key(key))
1018            
1019            exception = None
1020            result = None
1021            try:
1022                result = env_data_cache_db[key] = self.serializer.dumps(value)
1023            except:
1024                exception = get_exception()
1025            
1026            result_db_items[key] = (result, exception)
1027        
1028        self.make_live()
1029        return True, result_items, None
1030    
1031    def _on_delete(self, request: DbRequest, key: KeyType, db_id: DbId = None) -> Tuple[bool, None, Optional[Exception]]:
1032        key = self.serializer.dumps(normalize_compound_key(key))
1033        db_id = request._get_db_id(db_id)
1034        env_id = request.env_id
1035        
1036        exception = None
1037        result = None
1038        try:
1039            if env_id not in self.deletion_cache:
1040                self.deletion_cache[env_id] = dict()
1041            
1042            env_deletion_cache = self.deletion_cache[env_id]
1043            if db_id not in env_deletion_cache:
1044                env_deletion_cache[db_id] = set()
1045            
1046            env_deletion_cache_db = env_deletion_cache[db_id]
1047            env_deletion_cache_db.add(key)
1048        except:
1049            exception = get_exception()
1050        
1051        self.make_live()
1052        return True, result, exception
1053    
1054    def _on_delete_kv(self, request: DbRequest, key: InputKeyType, value: ValueType, db_id: DbId = None) -> Tuple[bool, RawValueType, Optional[Exception]]:
1055        key = self.serializer.dumps(normalize_compound_key(key))
1056        db_id = request._get_db_id(db_id)
1057        env_id = request.env_id
1058        
1059        exception = None
1060        result = None
1061        try:
1062            if env_id not in self.kv_deletion_cache:
1063                self.kv_deletion_cache[env_id] = dict()
1064            
1065            env_kv_deletion_cache = self.kv_deletion_cache[env_id]
1066            if db_id not in env_kv_deletion_cache:
1067                env_kv_deletion_cache[db_id] = dict()
1068            
1069            env_kv_deletion_cache_db = env_kv_deletion_cache[db_id]
1070            result = env_kv_deletion_cache_db[key] = self.serializer.dumps(value)
1071        except:
1072            exception = get_exception()
1073        
1074        self.make_live()
1075        return True, result, exception
1076    
1077    def _on_delete_items(self, request: DbRequest, db_items: Set[InputKeyType], db_id: DbId = None) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Optional[Exception]]], Optional[Exception]]:
1078        result_items: Dict[DbId, Dict[RawKeyType, Optional[Exception]]] = dict()
1079        env_id = request.env_id
1080
1081        if env_id not in self.deletion_cache:
1082            self.deletion_cache[env_id] = dict()
1083        
1084        env_deletion_cache = self.deletion_cache[env_id]
1085        if db_id not in result_items:
1086            result_items[db_id] = dict()
1087        
1088        result_db_items = result_items[db_id]
1089        
1090        if db_id not in env_deletion_cache:
1091            env_deletion_cache[db_id] = set()
1092        
1093        env_deletion_cache_db = env_deletion_cache[db_id]
1094        for key in db_items:
1095            key = self.serializer.dumps(normalize_compound_key(key))
1096            
1097            exception = None
1098            try:
1099                env_deletion_cache_db.add(key)
1100            except:
1101                exception = get_exception()
1102            
1103            result_db_items[key] = exception
1104
1105        self.make_live()
1106        return True, result_items, None
1107    
1108    def _on_delete_kv_items(self, request: DbRequest, db_items: Dict[InputKeyType, Tuple[ValueType]], db_id: DbId = None) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]], Optional[Exception]]:
1109        result_items: Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]] = dict()
1110        env_id = request.env_id
1111
1112        if env_id not in self.kv_deletion_cache:
1113            self.kv_deletion_cache[env_id] = dict()
1114        
1115        env_kv_deletion_cache = self.kv_deletion_cache[env_id]
1116        if db_id not in result_items:
1117            result_items[db_id] = dict()
1118        
1119        result_db_items = result_items[db_id]
1120        
1121        if db_id not in env_kv_deletion_cache:
1122            env_kv_deletion_cache[db_id] = dict()
1123        
1124        env_kv_deletion_cache_db = env_kv_deletion_cache[db_id]
1125        for key, value in db_items.items():
1126            exception = None
1127            result = None
1128            try:
1129                key = self.serializer.dumps(normalize_compound_key(key))
1130                result = env_kv_deletion_cache_db[key] = self.serializer.dumps(value)
1131            except:
1132                exception = get_exception()
1133            
1134            result_db_items[key] = (result, exception)
1135        
1136        self.make_live()
1137        return True, result_items, None
1138    
1139    def _on_open_databases(self, request: DbRequest, db_names: Dict[DbId, DbName]) -> ServiceProcessingResponse:
1140        exception = None
1141        try:
1142            env_info: EnvInfo = self.db_environments[request.env_id]
1143        except KeyError:
1144            exception = UnknownEnvError(request.env_id)
1145        
1146        for db_id, db_name in db_names.items():
1147            env_info.databases[db_id] = env_info.env.open_db(db_name)
1148            env_info.db_names[db_id] = db_name
1149        
1150        env_info.env.sync(True)
1151        return True, None, exception
1152    
1153    def _on_drop_db(self, request: DbRequest, db_id: DbId, delete: bool = False) -> ServiceProcessingResponse:
1154        coro_id = self.current_caller_coro_info.coro_id
1155        env_id = request.env_id
1156
1157        if env_id not in self.drop_db_requests:
1158            self.drop_db_requests[env_id] = dict()
1159        
1160        drop_db_requests_env = self.drop_db_requests[env_id]
1161
1162        if db_id not in drop_db_requests_env:
1163            drop_db_requests_env[db_id] = [False, set()]
1164        
1165        drop_db_requests_env_db = drop_db_requests_env[db_id]
1166
1167        if delete:
1168            drop_db_requests_env_db[0] = delete
1169        
1170        drop_db_requests_env_db[1].add(coro_id)
1171        self.make_live()
1172        return False, None, None
1173    
1174    def sync_in_thread_pool(self, env_id: EnvId = None):
1175        async def sync_db_coro(i: Interface, self: 'Db', env_id: EnvId, asyncio_loop, need_to_ensure_asyncio_loop: bool):
1176            if need_to_ensure_asyncio_loop:
1177                asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None,CoroPriority.low, True))
1178            else:
1179                if asyncio_loop is None:
1180                    asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest.get())
1181            
1182            async def sync_db(self: 'Db', asyncio_loop, env: lmdb.Environment):
1183                def sync_worker():
1184                    env.sync(True)
1185                    self.write_locked.discard(env_id)
1186                
1187                await task_in_thread_pool(asyncio_loop, sync_worker)
1188
1189            env: lmdb.Environment = self.db_environments[env_id].env
1190            await i(AsyncioLoop, AsyncioLoopRequest.wait(sync_db(self, asyncio_loop, env)))
1191            self.write_locked_coro_id.discard(i.coro_id)
1192            def make_service_live_for_a_next_sync(self: 'Db'):
1193                self.make_live()
1194            
1195            await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self)
1196
1197        asyncio_loop = None
1198        need_to_ensure_asyncio_loop = False
1199        try:
1200            asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get()
1201        except AsyncioLoopWasNotSetError:
1202            need_to_ensure_asyncio_loop = True
1203
1204        coro: CoroWrapperBase = put_root_from_other_service(self, sync_db_coro, env_id, self, asyncio_loop, need_to_ensure_asyncio_loop)
1205        coro.is_background_coro = True
1206        self.write_locked.add(env_id)
1207        self.write_locked_coro_id.add(coro.coro_id)
1208
1209
1210DbRequest.default_service_type = Db
1211
1212
1213def lmdb_reapplier(env_info: EnvInfo, handler: Callable, *args, **kwargs):
1214    environment: lmdb.Environment = env_info.env
1215    databases: Dict[Hashable, Any] = env_info.databases
1216    failed = True
1217    while failed:
1218        need_to_resize: bool = False
1219        try:
1220            handler(environment, databases, *args, **kwargs)
1221            failed = False
1222        except DBError as err:
1223            if isinstance(err.original_exception, lmdb.MapFullError):
1224                need_to_resize = True
1225        
1226        if need_to_resize:
1227            environment.set_mapsize(environment.info()['map_size'] + 2 * 1024**2)
class Db(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin):
 300class Db(Service, EntityStatsMixin):
 301    def __init__(self, loop: CoroSchedulerType):
 302        super(Db, self).__init__(loop)
 303        self.default_db_id: DbName = b'__default__'
 304        self.default_env_name: str = '__default__.dbenv'
 305        # self.drop_db_requests: Dict[CoroID, Tuple[Hashable, bool]] = dict()
 306        # self.drop_db_requests: Dict[EnvId, Dict[CoroID, Tuple[DbId, bool]]] = dict()
 307        self.drop_db_requests: Dict[EnvId, Dict[DbId, List[bool, Set[CoroID]]]] = dict()
 308        # self.read_queue: List[Tuple[CoroID, Tuple[RawKeyType, DbId]]] = list()
 309        self.read_queue: Dict[EnvId, Dict[DbId, Dict[RawKeyType, Set[CoroID]]]] = dict()
 310        self.massive_read_queue: Dict[EnvId, Dict[CoroID, Dict[DbId, Set[RawKeyType]]]] = dict()
 311        # self.massive_read_queue: List[Tuple[CoroID, Set[Tuple[KeyType, DbId]]]] = list()
 312        self.data_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, RawValueType]]] = dict()
 313        # self.data_cache: Dict[EnvId, Dict[Tuple[RawKeyType, DbId], RawValueType]] = dict()
 314        self.deletion_cache: Dict[EnvId, Dict[DbId, Set[RawKeyType]]] = dict()
 315        self.kv_deletion_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, RawValueType]]] = dict()
 316        # self.kv_deletion_cache: Dict[Tuple[Hashable, Hashable], Any] = dict()
 317        self.get_first_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict()
 318        self.get_last_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict()
 319        self.get_n_items_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, int, bool]]] = dict()
 320        self.get_items_range_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, RawKeyType, int, bool]]] = dict()
 321        self.get_all_items_queue: List[Tuple[CoroID, DbId, EnvId]] = list()
 322        self.path_to_default_db_environment: str = None
 323        self.app_name_waiter: CoroWrapperBase = None
 324        self.default_db_environment: lmdb.Environment = None
 325        self.db_environments: Dict[EnvId, EnvInfo] = dict()
 326        # self.databases: Dict[Hashable, Any] = dict()
 327        # self.db_names: Dict[DbId, DbName] = dict()
 328        self.async_loop = None
 329        self.sync_time_interval = 1.0
 330        self.last_sync_time = perf_counter()
 331        self.force_sync: Set[EnvId] = set()
 332        self.write_locked: Set[EnvId] = set()
 333        self.writes_num: int = 0
 334        self.reads_num: int = 0
 335        self.deletes_num: int = 0
 336        self.db_drops_num: int = 0
 337        self.write_locked_coro_id: Set[CoroID] = set()
 338        # self.serializer = best_serializer_for_standard_data((DataFormats.binary,
 339        #                                    Tags.can_use_bytes,
 340        #                                    Tags.decode_str_as_str,
 341        #                                    Tags.decode_list_as_list,
 342        #                                    Tags.decode_bytes_as_bytes,
 343        #                                    Tags.superficial,
 344        #                                    Tags.current_platform,
 345        #                                    Tags.multi_platform),
 346        #                                   TestDataType.small,
 347        #                                   0.1)
 348        self.serializer = Serializer(Serializers.msgspec_messagepack)
 349
 350        self._request_workers = {
 351            0: self._on_set_default_db_environment_path,
 352            1: self._on_open_databases,
 353            2: self._on_drop_db,
 354            3: self._on_sync,
 355            4: self._on_get,
 356            5: self._on_get_items,
 357            6: self._on_get_all_items,
 358            7: self._on_put,
 359            8: self._on_put_items,
 360            9: self._on_delete,
 361            10: self._on_delete_kv,
 362            11: self._on_delete_items,
 363            12: self._on_delete_kv_items,
 364            13: self._on_open_db_environment,
 365            14: self._on_get_first,
 366            15: self._on_get_last,
 367            16: self._on_get_n_items,
 368            17: self._on_get_items_range,
 369        }
 370
 371    # TODO: sync with last implementation
 372    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
 373        return type(self).__name__, {
 374            'db_names': list(self.db_names.keys()),
 375            'writes num': self.writes_num,
 376            'reads num': self.reads_num,
 377            'deletes num': self.deletes_num,
 378            'db drops num': self.db_drops_num,
 379        }
 380    
 381    def norm_key(self, key: InputKeyType) -> NormalizedKeyType:
 382        return normalize_compound_key(key)
 383
 384    def raw_key(self, env_or_id: Union[lmdb.Environment, EnvId], key: InputKeyType) -> RawKeyType:
 385        raw_key: bytes = self.serializer.dumps(normalize_compound_key(key))
 386        if isinstance(env_or_id, lmdb.Environment):
 387            env = env_or_id
 388        else:
 389            env = self.db_environments[env_or_id].env
 390        
 391        if len(raw_key) > env.max_key_size():
 392            raise RawKeyIsTooLargeError(f'Raw form ({raw_key=}) of the key ({key=}) is too large: {len(raw_key)} > {env.max_key_size()}')
 393
 394    def single_task_registration_or_immediate_processing(
 395            self, *args, **kwargs) -> ServiceProcessingResponse:
 396        result = self.try_resolve_request(*args, **kwargs)
 397        if result is None:
 398            return True, None, None
 399        else:
 400            return result
 401
 402    def full_processing_iteration(self):
 403        if self.default_db_environment is None:
 404            if self.path_to_default_db_environment is None:
 405                if self.app_name_waiter is None:
 406                    async def coro(i: Interface, self: 'Db'):
 407                        app_name_for_fs = await i(InstanceRequest().wait('app_name_for_fs'))
 408                        app_data_dir_path_type: AppDirectoryType = await i(InstanceRequest().wait('app_data_dir_path_type'))
 409                        app_dir_path: AppDirPath = await i(InstanceRequest().wait(AppDirPath))
 410                        app_data_dir_path: str = app_dir_path.cached(app_data_dir_path_type, app_name_for_fs)
 411                        self.path_to_default_db_environment = RelativePath(app_data_dir_path)(self.default_env_name)
 412                        self.app_name_waiter = None
 413                        self.make_live()
 414                    
 415                    self.app_name_waiter = put_root_from_other_service(self, coro, self)
 416                    self.app_name_waiter.is_background_coro = True
 417                
 418                self.make_dead()
 419                return
 420            else:
 421                self._init_default_db()
 422        
 423        if self.force_sync:
 424            envs_need_to_be_sync: Set[DbId] = self.force_sync
 425            self.force_sync = set()
 426        else:
 427            envs_need_to_be_sync = set()
 428        
 429        data_cache_buff = self.data_cache
 430        self.data_cache = type(data_cache_buff)()
 431        
 432        read_queue_buff = self.read_queue
 433        self.read_queue = type(read_queue_buff)()
 434        
 435        massive_read_queue_buff = self.massive_read_queue
 436        self.massive_read_queue = type(massive_read_queue_buff)()
 437        
 438        kv_deletion_cache_buff = self.kv_deletion_cache
 439        self.kv_deletion_cache = type(kv_deletion_cache_buff)()
 440        
 441        get_all_items_queue_buff = self.get_all_items_queue
 442        self.get_all_items_queue = type(get_all_items_queue_buff)()
 443
 444        get_first_queue_buff = self.get_first_queue
 445        self.get_first_queue = type(get_first_queue_buff)()
 446
 447        get_last_queue_buff = self.get_last_queue
 448        self.get_last_queue = type(get_last_queue_buff)()
 449
 450        get_n_items_queue_buff = self.get_n_items_queue
 451        self.get_n_items_queue = type(get_n_items_queue_buff)()
 452
 453        get_items_range_queue_buff = self.get_items_range_queue
 454        self.get_items_range_queue = type(get_items_range_queue_buff)()
 455
 456        # put
 457        def put_handler(env_info: EnvInfo, put_info: Dict[DbId, Dict[RawKeyType, RawValueType]]):
 458            try:
 459                with env_info.env.begin(write=True) as txn:
 460                    for db_id, db_put_info in put_info.items():
 461                        if db_id in env_info.databases:
 462                            for raw_key, value in db_put_info.items():
 463                                txn.put(raw_key, value, db=env_info.databases[db_id], dupdata=False, append=False)
 464                        
 465                        self.writes_num += len(db_put_info)
 466            except lmdb.MapFullError:
 467                raise DBError.from_exception(db_id)
 468        
 469        for env_id, put_info in data_cache_buff.items():
 470            if env_id in self.db_environments:
 471                envs_need_to_be_sync.add(env_id)
 472                lmdb_reapplier(self.db_environments[env_id], put_handler, put_info)
 473
 474        # TODO: implement delete* methods processing
 475        # delete
 476        for env_id, kv_deletion_cache_buff_db_info in kv_deletion_cache_buff.items():
 477            if env_id in self.db_environments:
 478                envs_need_to_be_sync.add(env_id)
 479                ...
 480
 481        for key_info, value in kv_deletion_cache_buff.items():
 482            with self.default_db_environment.begin(write=True) as txn:
 483                key, db_id = key_info
 484                txn.delete(key, value, db=self.databases[db_id])
 485                self.deletes_num += 1
 486
 487        # drop
 488        drop_db_requests_buff = self.drop_db_requests
 489        self.drop_db_requests = type(drop_db_requests_buff)()
 490        
 491        def drop_handler(env_info: EnvInfo, drop_info: Dict[DbId, List[bool, Set[CoroID]]]):
 492            for db_id, db_drop_info in drop_info.items():
 493                delete_db, coro_id = db_drop_info
 494                if db_id in env_info.databases:
 495                    try:
 496                        with env_info.env.begin(write=True) as txn:
 497                            txn.drop(db=env_info.databases[db_id], delete=delete_db)
 498                            if delete_db:
 499                                del env_info.databases[db_id]
 500                                del env_info.db_names[db_id]
 501                        
 502                        self.db_drops_num += 1
 503                    except lmdb.MapFullError:
 504                        raise DBError.from_exception(db_id)
 505                    
 506                    self.register_response(coro_id, None, UnknownEnvError(env_id))
 507                else:
 508                    self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 509        
 510        for env_id, drop_info in drop_db_requests_buff.items():
 511            if env_id in self.db_environments:
 512                envs_need_to_be_sync.add(env_id)
 513                lmdb_reapplier(self.db_environments[env_id], drop_handler, drop_info)
 514            else:
 515                for db_id, db_drop_info in drop_info.items():
 516                    delete_db, coro_id = db_drop_info
 517                    self.register_response(coro_id, None, UnknownEnvError(env_id))
 518
 519        # get
 520        def get_item(txn, key_info: Tuple[RawKeyType, DbId, EnvId], data_cache_buff: Dict[EnvId, Dict[DbId, Dict[RawKeyType, RawValueType]]]) -> Tuple[ValueType, Optional[Exception]]:
 521            key, db_id, env_id = key_info
 522            need_to_get_from_db = True
 523            try:
 524                value = data_cache_buff[env_id][db_id][key]
 525                need_to_get_from_db = False
 526            except KeyError:
 527                pass
 528            
 529            if need_to_get_from_db:
 530                value = txn.get(key, db=self.db_environments[env_id].databases[db_id])
 531                self.reads_num += 1
 532            
 533            exception = None
 534            try:
 535                if value is None:
 536                    exception = DbKeyError(key_info)
 537                else:
 538                    value = self.serializer.loads(value)
 539            except:
 540                exception = get_exception()
 541            
 542            return value, exception
 543        
 544        # _on_get
 545        for env_id, read_queue_buff_db_info in read_queue_buff.items():
 546            if env_id in self.db_environments:
 547                env_info = self.db_environments[env_id]
 548                for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items():
 549                    if db_id in env_info.databases:
 550                        with env_info.env.begin() as txn:
 551                            for raw_key, coro_ids in read_queue_buff_db_key_info.items():
 552                                value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff)
 553                                for coro_id in coro_ids:
 554                                    self.register_response(coro_id, value, exception)
 555                    else:
 556                        for raw_key, coro_ids in read_queue_buff_db_key_info.items():
 557                            for coro_id in coro_ids:
 558                                self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 559            else:
 560                for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items():
 561                    for raw_key, coro_ids in read_queue_buff_db_key_info.items():
 562                        for coro_id in coro_ids:
 563                            self.register_response(coro_id, None, UnknownEnvError(env_id))
 564        
 565        # _on_get_items
 566        results: Dict[CoroID, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]]] = dict()
 567        for env_id, massive_read_queue_buff_coro_info in massive_read_queue_buff.items():
 568            if env_id in self.db_environments:
 569                env_info = self.db_environments[env_id]
 570                for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items():
 571                    if coro_id not in results:
 572                        results[coro_id] = dict()
 573                    
 574                    coro_results = results[coro_id]
 575                    for db_id, raw_keys in read_queue_buff_db_info.items():
 576                        if db_id not in coro_results:
 577                            coro_results[db_id] = dict()
 578                        
 579                        coro_db_results = coro_results[db_id]
 580                        if db_id in env_info.databases:
 581                            with env_info.env.begin() as txn:
 582                                for raw_key in raw_keys:
 583                                    value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff)
 584                                    coro_db_results[normalize_compound_key(raw_key)] = (value, exception)
 585                        else:
 586                            for coro_id in coro_ids:
 587                                self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 588            else:
 589                for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items():
 590                    for db_id, raw_keys in read_queue_buff_db_info.items():
 591                        for coro_id in coro_ids:
 592                            self.register_response(coro_id, None, UnknownEnvError(env_id))
 593        
 594        for coro_id, coro_results in results.items():
 595            self.register_response(coro_id, coro_results, None)
 596
 597        # get all items
 598        for coro_id, db_id, env_id in get_all_items_queue_buff:
 599            if env_id in self.db_environments:
 600                env_info = self.db_environments[env_id]
 601                env = env_info.env
 602                if db_id in env_info.databases:
 603                    db = env_info.databases[db_id]
 604                    with env.begin(db=db) as txn:
 605                        result = list()
 606                        exception = None
 607                        try:
 608                            result = [(normalize_compound_key(self.serializer.loads(k)), self.serializer.loads(v)) for k, v in txn.cursor()]
 609                            self.reads_num += len(result)
 610                        except:
 611                            exception = get_exception()
 612                        
 613                        self.register_response(coro_id, result, exception)
 614                else:
 615                    self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 616            else:
 617                self.register_response(coro_id, None, UnknownEnvError(env_id))
 618        
 619        # get_first
 620        for env_id, get_first_queue_buff_db_info in get_first_queue_buff.items():
 621            if env_id in self.db_environments:
 622                env_info = self.db_environments[env_id]
 623                for db_id, coro_ids in get_first_queue_buff_db_info.items():
 624                    if db_id in env_info.databases:
 625                        db = env_info.databases[db_id]
 626                        with env_info.env.begin(db=db) as txn:
 627                            result = None
 628                            exception = None
 629                            try:
 630                                cursor: lmdb.Cursor = txn.cursor()
 631                                if cursor.first():
 632                                    result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value()))
 633                                    self.reads_num += 1
 634                                else:
 635                                    exception = KeyError()
 636                            except:
 637                                exception = get_exception()
 638                            
 639                            for coro_id in coro_ids:
 640                                self.register_response(coro_id, result, exception)
 641                    else:
 642                        for coro_id in coro_ids:
 643                            self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 644            else:
 645                for db_id, coro_ids in get_first_queue_buff_db_info.items():
 646                    for coro_id in coro_ids:
 647                        self.register_response(coro_id, None, UnknownEnvError(env_id))
 648        
 649        # get_last
 650        for env_id, get_last_queue_buff_db_info in get_last_queue_buff.items():
 651            if env_id in self.db_environments:
 652                env_info = self.db_environments[env_id]
 653                for db_id, coro_ids in get_last_queue_buff_db_info.items():
 654                    if db_id in env_info.databases:
 655                        db = env_info.databases[db_id]
 656                        with env_info.env.begin(db=db) as txn:
 657                            result = None
 658                            exception = None
 659                            try:
 660                                cursor: lmdb.Cursor = txn.cursor()
 661                                if cursor.first():
 662                                    result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value()))
 663                                    self.reads_num += 1
 664                                else:
 665                                    exception = KeyError()
 666                            except:
 667                                exception = get_exception()
 668                            
 669                            for coro_id in coro_ids:
 670                                self.register_response(coro_id, result, exception)
 671                    else:
 672                        for coro_id in coro_ids:
 673                            self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 674            else:
 675                for db_id, coro_ids in get_last_queue_buff_db_info.items():
 676                    for coro_id in coro_ids:
 677                        self.register_response(coro_id, None, UnknownEnvError(env_id))
 678        
 679        # get_n_items
 680        for env_id, get_n_items_queue_buff_coro_info in get_n_items_queue_buff.items():
 681            if env_id in self.db_environments:
 682                env_info = self.db_environments[env_id]
 683                for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items():
 684                    coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list()
 685                    db_id, first_desired_raw_key, num_items, reverse = read_queue_buff_db_info
 686                    if db_id in env_info.databases:
 687                        db = env_info.databases[db_id]
 688                        with env_info.env.begin(db=db) as txn:
 689                            result = None
 690                            exception = None
 691                            try:
 692                                cursor: lmdb.Cursor = txn.cursor()
 693                                if cursor.set_range(first_desired_raw_key):
 694                                    if reverse:
 695                                        cursor_iterator = cursor.iterprev(keys=True, values=True)
 696                                    else:
 697                                        cursor_iterator = cursor.iternext(keys=True, values=True)
 698                                    
 699                                    for raw_key, raw_value in cursor_iterator:
 700                                        if (num_items is not None) and (num_items <= 0):
 701                                            break
 702                                        
 703                                        coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value)))
 704                                        self.reads_num += 1
 705                                        if num_items is not None:
 706                                            num_items -= 1
 707                                else:
 708                                    exception = KeyError()
 709                            except:
 710                                exception = get_exception()
 711                            
 712                            self.register_response(coro_id, result, exception)
 713                    else:
 714                        self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 715            else:
 716                for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items():
 717                    for coro_id in coro_ids:
 718                        self.register_response(coro_id, None, UnknownEnvError(env_id))
 719        
 720        # get_items_range
 721        for env_id, get_items_range_queue_buff_coro_info in get_items_range_queue_buff.items():
 722            if env_id in self.db_environments:
 723                env_info = self.db_environments[env_id]
 724                for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items():
 725                    coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list()
 726                    db_id, first_desired_raw_key, last_desired_raw_key, num_items, reverse = read_queue_buff_db_info
 727                    if db_id in env_info.databases:
 728                        db = env_info.databases[db_id]
 729                        with env_info.env.begin(db=db) as txn:
 730                            result = None
 731                            exception = None
 732                            try:
 733                                cursor: lmdb.Cursor = txn.cursor()
 734                                if cursor.set_range(first_desired_raw_key):
 735                                    if reverse:
 736                                        cursor_iterator = cursor.iterprev(keys=True, values=True)
 737                                    else:
 738                                        cursor_iterator = cursor.iternext(keys=True, values=True)
 739                                    
 740                                    for raw_key, raw_value in cursor_iterator:
 741                                        if reverse:
 742                                            if raw_key < last_desired_raw_key:
 743                                                break
 744                                        else:
 745                                            if raw_key > last_desired_raw_key:
 746                                                break
 747
 748                                        if (num_items is not None) and (num_items <= 0):
 749                                            break
 750
 751                                        coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value)))
 752                                        self.reads_num += 1
 753                                        if num_items is not None:
 754                                            num_items -= 1
 755                                else:
 756                                    exception = KeyError()
 757                            except:
 758                                exception = get_exception()
 759                            
 760                            self.register_response(coro_id, result, exception)
 761                    else:
 762                        self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
 763            else:
 764                for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items():
 765                    for coro_id in coro_ids:
 766                        self.register_response(coro_id, None, UnknownEnvError(env_id))
 767        
 768        # sync
 769        if envs_need_to_be_sync:
 770            for env_id in envs_need_to_be_sync:
 771                self.sync_in_thread_pool(env_id)
 772            
 773            self.last_sync_time = perf_counter()
 774
 775        self.make_dead()
 776
 777    def in_work(self) -> bool:
 778        result: bool = (bool(self.get_first_queue) or bool(self.get_last_queue) or bool(self.get_n_items_queue) or bool(self.get_items_range_queue) or bool(self.read_queue) or bool(self.massive_read_queue) or bool(self.get_all_items_queue)) or (bool(self.force_sync) or ((not bool(self.write_locked)) and (bool(self.data_cache) or bool(self.kv_deletion_cache) or bool(self.drop_db_requests)) and ((perf_counter() - self.last_sync_time) >= self.sync_time_interval)))
 779        return self.thrifty_in_work(result)
 780    
 781    def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]:
 782        time_since_last_sync_time: float = perf_counter() - self.last_sync_time
 783        if self.sync_time_interval > time_since_last_sync_time:
 784            return True, self.sync_time_interval - time_since_last_sync_time
 785        else:
 786            return True, 0
 787
 788    def _init_db(self, db_id: DbId):
 789        # print(f'{self.path_to_default_db_environment=}')
 790        if db_id in self.db_environments:
 791            return self.db_environments[db_id]
 792
 793        env_init_info: EnvInitInfo = EnvInitInfo(
 794            db_id, 
 795            map_size=20 * 1024**2, 
 796            writemap=True, 
 797            max_dbs=10,
 798            map_async=True, 
 799            lock=False, 
 800            metasync=False, 
 801            sync=False, 
 802            meminit=False,
 803        )
 804        env_info: EnvInfo = EnvInfo(env_init_info, lmdb.open(*env_init_info.args, **env_init_info.kwargs))
 805        db_environment = env_info.env
 806        self.db_environments[env_info.env_id] = env_info
 807        env_info.databases[self.default_db_id] = env_info.databases[None] = db_environment.open_db(self.default_db_id)
 808        db_environment.sync(True)
 809
 810    def _init_default_db(self):
 811        env_info: EnvInfo = self._init_db(self.path_to_default_db_environment)
 812        self.db_environments[None] = env_info
 813
 814    def _on_open_db_environment(self, request: DbRequest, path_to_db_environment: str) -> ServiceProcessingResponse:
 815        result: EnvInfo = None
 816        try:
 817            result = self._init_db(path_to_db_environment)
 818        except:
 819            exception = get_exception()
 820            return True, result, exception
 821        
 822        return True, result.env_id, None
 823
 824    def _on_set_default_db_environment_path(self, request: DbRequest, path_to_db_environment: str) -> ServiceProcessingResponse:
 825        if {None, path_to_db_environment} & self.write_locked:
 826            return True, False, None
 827        
 828        if self.default_db_environment is None:
 829            self.path_to_default_db_environment = path_to_db_environment
 830            try:
 831                self._init_default_db()
 832            except:
 833                exception = get_exception()
 834                return True, False, exception
 835            return True, True, None
 836        else:
 837            return True, False, None
 838    
 839    def _on_sync(self, request: DbRequest) -> ServiceProcessingResponse:
 840        if self.data_cache:
 841            self.force_sync.add(request.env_id)
 842            self.make_live()
 843        else:
 844            # self.default_db_environment.sync(True)
 845            self.sync_in_thread_pool(request.env_id)
 846        
 847        return True, None, None
 848    
 849    def _on_get(self, request: DbRequest, key: KeyType, db_id: DbId = None) -> ServiceProcessingResponse:
 850        coro_id = self.current_caller_coro_info.coro_id
 851        key = self.serializer.dumps(normalize_compound_key(key))
 852        env_id = request.env_id
 853        db_id = request._get_db_id(db_id)
 854        
 855        if env_id in self.data_cache:
 856            env_cache = self.data_cache[env_id]
 857            if db_id in env_cache:
 858                db_cache = env_cache[db_id]
 859                if key in db_cache:
 860                    return True, db_cache[key], None
 861        
 862        if env_id not in self.read_queue:
 863            self.read_queue[env_id] = dict()
 864        
 865        read_queue_env = self.read_queue[env_id]
 866        if db_id not in read_queue_env:
 867            read_queue_env[db_id] = dict()
 868        
 869        read_queue_env_db = read_queue_env[db_id]
 870        if key not in read_queue_env_db:
 871            read_queue_env_db[key] = set()
 872        
 873        read_queue_env_db_key = read_queue_env_db[key]
 874        read_queue_env_db_key.add(coro_id)
 875        self.make_live()
 876        return False, None, None
 877    
 878    def _on_get_first(self, request: DbRequest, db_id: DbId = None) -> ServiceProcessingResponse:
 879        coro_id = self.current_caller_coro_info.coro_id
 880        env_id = request.env_id
 881        db_id = request._get_db_id(db_id)
 882        
 883        if env_id not in self.get_first_queue:
 884            self.get_first_queue[env_id] = dict()
 885        
 886        get_first_queue_env = self.get_first_queue[env_id]
 887        if db_id not in get_first_queue_env:
 888            get_first_queue_env[db_id] = set()
 889        
 890        get_first_queue_env_db = get_first_queue_env[db_id]
 891        get_first_queue_env_db.add(coro_id)
 892        
 893        self.make_live()
 894        return False, None, None
 895    
 896    def _on_get_last(self, request: DbRequest, db_id: DbId = None) -> ServiceProcessingResponse:
 897        coro_id = self.current_caller_coro_info.coro_id
 898        env_id = request.env_id
 899        db_id = request._get_db_id(db_id)
 900        
 901        if env_id not in self.get_last_queue:
 902            self.get_last_queue[env_id] = dict()
 903        
 904        get_last_queue_env = self.get_last_queue[env_id]
 905        if db_id not in get_last_queue_env:
 906            get_last_queue_env[db_id] = set()
 907        
 908        get_last_queue_env_db = get_last_queue_env[db_id]
 909        get_last_queue_env_db.add(coro_id)
 910        
 911        self.make_live()
 912        return False, None, None
 913    
 914    def _on_get_n_items(self, request: DbRequest, desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None, reverse: bool = False) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]:
 915        coro_id = self.current_caller_coro_info.coro_id
 916        desired_key = self.serializer.dumps(normalize_compound_key(desired_key))
 917        db_id = request._get_db_id(db_id)
 918        env_id = request.env_id
 919
 920        if env_id not in self.get_n_items_queue:
 921            self.get_n_items_queue[env_id] = dict()
 922        
 923        get_n_items_queue_env = self.get_n_items_queue[env_id]
 924        get_n_items_queue_env[coro_id] = (db_id, desired_key, num, reverse)
 925        self.make_live()
 926        return False, None, None
 927    
 928    def _on_get_items_range(self, request: DbRequest, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None, reverse: bool = False) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]:
 929        coro_id = self.current_caller_coro_info.coro_id
 930        first_desired_key = self.serializer.dumps(normalize_compound_key(first_desired_key))
 931        last_desired_key = self.serializer.dumps(normalize_compound_key(last_desired_key))
 932        db_id = request._get_db_id(db_id)
 933        env_id = request.env_id
 934
 935        if env_id not in self.get_items_range_queue:
 936            self.get_items_range_queue[env_id] = dict()
 937        
 938        get_items_range_queue_env = self.get_items_range_queue[env_id]
 939        get_items_range_queue_env[coro_id] = (db_id, first_desired_key, last_desired_key, num, reverse)
 940        self.make_live()
 941        return False, None, None
 942    
 943    def _on_get_items(self, request: DbRequest, db_keys: Union[Set[InputKeyType], Dict[DbId, Set[InputKeyType]]]) -> Tuple[bool, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]], Optional[Exception]]:
 944        coro_id = self.current_caller_coro_info.coro_id
 945        env_id = request.env_id
 946
 947        if env_id not in self.massive_read_queue:
 948            self.massive_read_queue[env_id] = dict()
 949        
 950        massive_read_queue_env = self.massive_read_queue[env_id]
 951        if coro_id not in massive_read_queue_env:
 952            massive_read_queue_env[coro_id] = dict()
 953        
 954        massive_read_queue_env_coro = massive_read_queue_env[coro_id]
 955        if request.db_id is not None:
 956            db_keys = {request.db_id: db_keys}
 957        
 958        for db_id, keys in db_keys.items():
 959            if db_id not in massive_read_queue_env_coro:
 960                massive_read_queue_env_coro[db_id] = set()
 961            
 962            massive_read_queue_env_coro_db = massive_read_queue_env_coro[db_id]
 963            for key in keys:
 964                massive_read_queue_env_coro_db.add(self.serializer.dumps(normalize_compound_key(key)))
 965
 966        self.make_live()
 967        return False, None, None
 968    
 969    def _on_get_all_items(self, request: DbRequest, db_id: DbId) -> Tuple[bool, List[Tuple[NormalizedKeyType, ValueType]], Optional[Exception]]:
 970        db_id = request._get_db_id(db_id)
 971        coro_id = self.current_caller_coro_info.coro_id
 972        self.get_all_items_queue.append((coro_id, db_id, request.env_id))
 973        self.make_live()
 974        return False, None, None
 975    
 976    def _on_put(self, request: DbRequest, key: KeyType, value: Any, db_id: DbId = None) -> Tuple[bool, RawValueType, Optional[Exception]]:
 977        key = self.serializer.dumps(normalize_compound_key(key))
 978        env_id = request.env_id
 979        db_id = request._get_db_id(db_id)
 980        
 981        exception = None
 982        result = None
 983        try:
 984            if env_id not in self.data_cache:
 985                self.data_cache[env_id] = dict()
 986            
 987            env_data_cache = self.data_cache[env_id]
 988            if db_id not in env_data_cache:
 989                env_data_cache[db_id] = dict()
 990            
 991            env_data_cache_db = env_data_cache[db_id]
 992            result = env_data_cache_db[key] = self.serializer.dumps(value)
 993        except:
 994            exception = get_exception()
 995        
 996        self.make_live()
 997        return True, result, exception
 998    
 999    def _on_put_items(self, request: DbRequest, db_items: Dict[InputKeyType, ValueType], db_id: DbId = None) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]], Optional[Exception]]:
1000        result_items: Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]] = dict()
1001        env_id = request.env_id
1002        db_id = request._get_db_id(db_id)
1003
1004        if env_id not in self.data_cache:
1005            self.data_cache[env_id] = dict()
1006        
1007        env_data_cache = self.data_cache[env_id]
1008        if db_id not in result_items:
1009            result_items[db_id] = dict()
1010        
1011        result_db_items = result_items[db_id]
1012        
1013        if db_id not in env_data_cache:
1014            env_data_cache[db_id] = dict()
1015        
1016        env_data_cache_db = env_data_cache[db_id]
1017        for key, value in db_items.items():
1018            key = self.serializer.dumps(normalize_compound_key(key))
1019            
1020            exception = None
1021            result = None
1022            try:
1023                result = env_data_cache_db[key] = self.serializer.dumps(value)
1024            except:
1025                exception = get_exception()
1026            
1027            result_db_items[key] = (result, exception)
1028        
1029        self.make_live()
1030        return True, result_items, None
1031    
1032    def _on_delete(self, request: DbRequest, key: KeyType, db_id: DbId = None) -> Tuple[bool, None, Optional[Exception]]:
1033        key = self.serializer.dumps(normalize_compound_key(key))
1034        db_id = request._get_db_id(db_id)
1035        env_id = request.env_id
1036        
1037        exception = None
1038        result = None
1039        try:
1040            if env_id not in self.deletion_cache:
1041                self.deletion_cache[env_id] = dict()
1042            
1043            env_deletion_cache = self.deletion_cache[env_id]
1044            if db_id not in env_deletion_cache:
1045                env_deletion_cache[db_id] = set()
1046            
1047            env_deletion_cache_db = env_deletion_cache[db_id]
1048            env_deletion_cache_db.add(key)
1049        except:
1050            exception = get_exception()
1051        
1052        self.make_live()
1053        return True, result, exception
1054    
1055    def _on_delete_kv(self, request: DbRequest, key: InputKeyType, value: ValueType, db_id: DbId = None) -> Tuple[bool, RawValueType, Optional[Exception]]:
1056        key = self.serializer.dumps(normalize_compound_key(key))
1057        db_id = request._get_db_id(db_id)
1058        env_id = request.env_id
1059        
1060        exception = None
1061        result = None
1062        try:
1063            if env_id not in self.kv_deletion_cache:
1064                self.kv_deletion_cache[env_id] = dict()
1065            
1066            env_kv_deletion_cache = self.kv_deletion_cache[env_id]
1067            if db_id not in env_kv_deletion_cache:
1068                env_kv_deletion_cache[db_id] = dict()
1069            
1070            env_kv_deletion_cache_db = env_kv_deletion_cache[db_id]
1071            result = env_kv_deletion_cache_db[key] = self.serializer.dumps(value)
1072        except:
1073            exception = get_exception()
1074        
1075        self.make_live()
1076        return True, result, exception
1077    
1078    def _on_delete_items(self, request: DbRequest, db_items: Set[InputKeyType], db_id: DbId = None) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Optional[Exception]]], Optional[Exception]]:
1079        result_items: Dict[DbId, Dict[RawKeyType, Optional[Exception]]] = dict()
1080        env_id = request.env_id
1081
1082        if env_id not in self.deletion_cache:
1083            self.deletion_cache[env_id] = dict()
1084        
1085        env_deletion_cache = self.deletion_cache[env_id]
1086        if db_id not in result_items:
1087            result_items[db_id] = dict()
1088        
1089        result_db_items = result_items[db_id]
1090        
1091        if db_id not in env_deletion_cache:
1092            env_deletion_cache[db_id] = set()
1093        
1094        env_deletion_cache_db = env_deletion_cache[db_id]
1095        for key in db_items:
1096            key = self.serializer.dumps(normalize_compound_key(key))
1097            
1098            exception = None
1099            try:
1100                env_deletion_cache_db.add(key)
1101            except:
1102                exception = get_exception()
1103            
1104            result_db_items[key] = exception
1105
1106        self.make_live()
1107        return True, result_items, None
1108    
1109    def _on_delete_kv_items(self, request: DbRequest, db_items: Dict[InputKeyType, Tuple[ValueType]], db_id: DbId = None) -> Tuple[bool, Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]], Optional[Exception]]:
1110        result_items: Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]] = dict()
1111        env_id = request.env_id
1112
1113        if env_id not in self.kv_deletion_cache:
1114            self.kv_deletion_cache[env_id] = dict()
1115        
1116        env_kv_deletion_cache = self.kv_deletion_cache[env_id]
1117        if db_id not in result_items:
1118            result_items[db_id] = dict()
1119        
1120        result_db_items = result_items[db_id]
1121        
1122        if db_id not in env_kv_deletion_cache:
1123            env_kv_deletion_cache[db_id] = dict()
1124        
1125        env_kv_deletion_cache_db = env_kv_deletion_cache[db_id]
1126        for key, value in db_items.items():
1127            exception = None
1128            result = None
1129            try:
1130                key = self.serializer.dumps(normalize_compound_key(key))
1131                result = env_kv_deletion_cache_db[key] = self.serializer.dumps(value)
1132            except:
1133                exception = get_exception()
1134            
1135            result_db_items[key] = (result, exception)
1136        
1137        self.make_live()
1138        return True, result_items, None
1139    
1140    def _on_open_databases(self, request: DbRequest, db_names: Dict[DbId, DbName]) -> ServiceProcessingResponse:
1141        exception = None
1142        try:
1143            env_info: EnvInfo = self.db_environments[request.env_id]
1144        except KeyError:
1145            exception = UnknownEnvError(request.env_id)
1146        
1147        for db_id, db_name in db_names.items():
1148            env_info.databases[db_id] = env_info.env.open_db(db_name)
1149            env_info.db_names[db_id] = db_name
1150        
1151        env_info.env.sync(True)
1152        return True, None, exception
1153    
1154    def _on_drop_db(self, request: DbRequest, db_id: DbId, delete: bool = False) -> ServiceProcessingResponse:
1155        coro_id = self.current_caller_coro_info.coro_id
1156        env_id = request.env_id
1157
1158        if env_id not in self.drop_db_requests:
1159            self.drop_db_requests[env_id] = dict()
1160        
1161        drop_db_requests_env = self.drop_db_requests[env_id]
1162
1163        if db_id not in drop_db_requests_env:
1164            drop_db_requests_env[db_id] = [False, set()]
1165        
1166        drop_db_requests_env_db = drop_db_requests_env[db_id]
1167
1168        if delete:
1169            drop_db_requests_env_db[0] = delete
1170        
1171        drop_db_requests_env_db[1].add(coro_id)
1172        self.make_live()
1173        return False, None, None
1174    
1175    def sync_in_thread_pool(self, env_id: EnvId = None):
1176        async def sync_db_coro(i: Interface, self: 'Db', env_id: EnvId, asyncio_loop, need_to_ensure_asyncio_loop: bool):
1177            if need_to_ensure_asyncio_loop:
1178                asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None,CoroPriority.low, True))
1179            else:
1180                if asyncio_loop is None:
1181                    asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest.get())
1182            
1183            async def sync_db(self: 'Db', asyncio_loop, env: lmdb.Environment):
1184                def sync_worker():
1185                    env.sync(True)
1186                    self.write_locked.discard(env_id)
1187                
1188                await task_in_thread_pool(asyncio_loop, sync_worker)
1189
1190            env: lmdb.Environment = self.db_environments[env_id].env
1191            await i(AsyncioLoop, AsyncioLoopRequest.wait(sync_db(self, asyncio_loop, env)))
1192            self.write_locked_coro_id.discard(i.coro_id)
1193            def make_service_live_for_a_next_sync(self: 'Db'):
1194                self.make_live()
1195            
1196            await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self)
1197
1198        asyncio_loop = None
1199        need_to_ensure_asyncio_loop = False
1200        try:
1201            asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get()
1202        except AsyncioLoopWasNotSetError:
1203            need_to_ensure_asyncio_loop = True
1204
1205        coro: CoroWrapperBase = put_root_from_other_service(self, sync_db_coro, env_id, self, asyncio_loop, need_to_ensure_asyncio_loop)
1206        coro.is_background_coro = True
1207        self.write_locked.add(env_id)
1208        self.write_locked_coro_id.add(coro.coro_id)
Db( loop: Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
301    def __init__(self, loop: CoroSchedulerType):
302        super(Db, self).__init__(loop)
303        self.default_db_id: DbName = b'__default__'
304        self.default_env_name: str = '__default__.dbenv'
305        # self.drop_db_requests: Dict[CoroID, Tuple[Hashable, bool]] = dict()
306        # self.drop_db_requests: Dict[EnvId, Dict[CoroID, Tuple[DbId, bool]]] = dict()
307        self.drop_db_requests: Dict[EnvId, Dict[DbId, List[bool, Set[CoroID]]]] = dict()
308        # self.read_queue: List[Tuple[CoroID, Tuple[RawKeyType, DbId]]] = list()
309        self.read_queue: Dict[EnvId, Dict[DbId, Dict[RawKeyType, Set[CoroID]]]] = dict()
310        self.massive_read_queue: Dict[EnvId, Dict[CoroID, Dict[DbId, Set[RawKeyType]]]] = dict()
311        # self.massive_read_queue: List[Tuple[CoroID, Set[Tuple[KeyType, DbId]]]] = list()
312        self.data_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, RawValueType]]] = dict()
313        # self.data_cache: Dict[EnvId, Dict[Tuple[RawKeyType, DbId], RawValueType]] = dict()
314        self.deletion_cache: Dict[EnvId, Dict[DbId, Set[RawKeyType]]] = dict()
315        self.kv_deletion_cache: Dict[EnvId, Dict[DbId, Dict[RawKeyType, RawValueType]]] = dict()
316        # self.kv_deletion_cache: Dict[Tuple[Hashable, Hashable], Any] = dict()
317        self.get_first_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict()
318        self.get_last_queue: Dict[EnvId, Dict[DbId, Set[CoroID]]] = dict()
319        self.get_n_items_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, int, bool]]] = dict()
320        self.get_items_range_queue: Dict[EnvId, Dict[CoroID, Tuple[DbId, RawKeyType, RawKeyType, int, bool]]] = dict()
321        self.get_all_items_queue: List[Tuple[CoroID, DbId, EnvId]] = list()
322        self.path_to_default_db_environment: str = None
323        self.app_name_waiter: CoroWrapperBase = None
324        self.default_db_environment: lmdb.Environment = None
325        self.db_environments: Dict[EnvId, EnvInfo] = dict()
326        # self.databases: Dict[Hashable, Any] = dict()
327        # self.db_names: Dict[DbId, DbName] = dict()
328        self.async_loop = None
329        self.sync_time_interval = 1.0
330        self.last_sync_time = perf_counter()
331        self.force_sync: Set[EnvId] = set()
332        self.write_locked: Set[EnvId] = set()
333        self.writes_num: int = 0
334        self.reads_num: int = 0
335        self.deletes_num: int = 0
336        self.db_drops_num: int = 0
337        self.write_locked_coro_id: Set[CoroID] = set()
338        # self.serializer = best_serializer_for_standard_data((DataFormats.binary,
339        #                                    Tags.can_use_bytes,
340        #                                    Tags.decode_str_as_str,
341        #                                    Tags.decode_list_as_list,
342        #                                    Tags.decode_bytes_as_bytes,
343        #                                    Tags.superficial,
344        #                                    Tags.current_platform,
345        #                                    Tags.multi_platform),
346        #                                   TestDataType.small,
347        #                                   0.1)
348        self.serializer = Serializer(Serializers.msgspec_messagepack)
349
350        self._request_workers = {
351            0: self._on_set_default_db_environment_path,
352            1: self._on_open_databases,
353            2: self._on_drop_db,
354            3: self._on_sync,
355            4: self._on_get,
356            5: self._on_get_items,
357            6: self._on_get_all_items,
358            7: self._on_put,
359            8: self._on_put_items,
360            9: self._on_delete,
361            10: self._on_delete_kv,
362            11: self._on_delete_items,
363            12: self._on_delete_kv_items,
364            13: self._on_open_db_environment,
365            14: self._on_get_first,
366            15: self._on_get_last,
367            16: self._on_get_n_items,
368            17: self._on_get_items_range,
369        }
default_db_id: bytes
default_env_name: str
drop_db_requests: 'Dict[(EnvId, Dict[(DbId, List[(bool, Set[CoroID])])])]'
read_queue: Dict[Hashable, Dict[Hashable, Dict[bytes, Set[int]]]]
massive_read_queue: Dict[Hashable, Dict[int, Dict[Hashable, Set[bytes]]]]
data_cache: Dict[Hashable, Dict[Hashable, Dict[bytes, bytes]]]
deletion_cache: Dict[Hashable, Dict[Hashable, Set[bytes]]]
kv_deletion_cache: Dict[Hashable, Dict[Hashable, Dict[bytes, bytes]]]
get_first_queue: Dict[Hashable, Dict[Hashable, Set[int]]]
get_last_queue: Dict[Hashable, Dict[Hashable, Set[int]]]
get_n_items_queue: Dict[Hashable, Dict[int, Tuple[Hashable, bytes, int, bool]]]
get_items_range_queue: Dict[Hashable, Dict[int, Tuple[Hashable, bytes, bytes, int, bool]]]
get_all_items_queue: List[Tuple[int, Hashable, Hashable]]
path_to_default_db_environment: str
app_name_waiter: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroWrapperBase
default_db_environment: Environment
db_environments: Dict[Hashable, cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.EnvInfo]
async_loop
sync_time_interval
last_sync_time
force_sync: Set[Hashable]
write_locked: Set[Hashable]
writes_num: int
reads_num: int
deletes_num: int
db_drops_num: int
write_locked_coro_id: Set[int]
serializer
def get_entity_stats( self, stats_level: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin.StatsLevel = <StatsLevel.debug: 1>) -> Tuple[str, Dict[str, Any]]:
372    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
373        return type(self).__name__, {
374            'db_names': list(self.db_names.keys()),
375            'writes num': self.writes_num,
376            'reads num': self.reads_num,
377            'deletes num': self.deletes_num,
378            'db drops num': self.db_drops_num,
379        }
def norm_key( self, key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]]) -> Union[bytes, str, Tuple[Union[bytes, str]], tuple[Union[Tuple[Union[bytes, str]], Tuple[ForwardRef('NormalizedCompoundKeyType')]]]]:
381    def norm_key(self, key: InputKeyType) -> NormalizedKeyType:
382        return normalize_compound_key(key)
def raw_key( self, env_or_id: Union[Environment, Hashable], key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]]) -> bytes:
384    def raw_key(self, env_or_id: Union[lmdb.Environment, EnvId], key: InputKeyType) -> RawKeyType:
385        raw_key: bytes = self.serializer.dumps(normalize_compound_key(key))
386        if isinstance(env_or_id, lmdb.Environment):
387            env = env_or_id
388        else:
389            env = self.db_environments[env_or_id].env
390        
391        if len(raw_key) > env.max_key_size():
392            raise RawKeyIsTooLargeError(f'Raw form ({raw_key=}) of the key ({key=}) is too large: {len(raw_key)} > {env.max_key_size()}')
def single_task_registration_or_immediate_processing( self, *args, **kwargs) -> Tuple[bool, Any, Union[BaseException, NoneType]]:
394    def single_task_registration_or_immediate_processing(
395            self, *args, **kwargs) -> ServiceProcessingResponse:
396        result = self.try_resolve_request(*args, **kwargs)
397        if result is None:
398            return True, None, None
399        else:
400            return result
def full_processing_iteration(self):
402    def full_processing_iteration(self):
403        if self.default_db_environment is None:
404            if self.path_to_default_db_environment is None:
405                if self.app_name_waiter is None:
406                    async def coro(i: Interface, self: 'Db'):
407                        app_name_for_fs = await i(InstanceRequest().wait('app_name_for_fs'))
408                        app_data_dir_path_type: AppDirectoryType = await i(InstanceRequest().wait('app_data_dir_path_type'))
409                        app_dir_path: AppDirPath = await i(InstanceRequest().wait(AppDirPath))
410                        app_data_dir_path: str = app_dir_path.cached(app_data_dir_path_type, app_name_for_fs)
411                        self.path_to_default_db_environment = RelativePath(app_data_dir_path)(self.default_env_name)
412                        self.app_name_waiter = None
413                        self.make_live()
414                    
415                    self.app_name_waiter = put_root_from_other_service(self, coro, self)
416                    self.app_name_waiter.is_background_coro = True
417                
418                self.make_dead()
419                return
420            else:
421                self._init_default_db()
422        
423        if self.force_sync:
424            envs_need_to_be_sync: Set[DbId] = self.force_sync
425            self.force_sync = set()
426        else:
427            envs_need_to_be_sync = set()
428        
429        data_cache_buff = self.data_cache
430        self.data_cache = type(data_cache_buff)()
431        
432        read_queue_buff = self.read_queue
433        self.read_queue = type(read_queue_buff)()
434        
435        massive_read_queue_buff = self.massive_read_queue
436        self.massive_read_queue = type(massive_read_queue_buff)()
437        
438        kv_deletion_cache_buff = self.kv_deletion_cache
439        self.kv_deletion_cache = type(kv_deletion_cache_buff)()
440        
441        get_all_items_queue_buff = self.get_all_items_queue
442        self.get_all_items_queue = type(get_all_items_queue_buff)()
443
444        get_first_queue_buff = self.get_first_queue
445        self.get_first_queue = type(get_first_queue_buff)()
446
447        get_last_queue_buff = self.get_last_queue
448        self.get_last_queue = type(get_last_queue_buff)()
449
450        get_n_items_queue_buff = self.get_n_items_queue
451        self.get_n_items_queue = type(get_n_items_queue_buff)()
452
453        get_items_range_queue_buff = self.get_items_range_queue
454        self.get_items_range_queue = type(get_items_range_queue_buff)()
455
456        # put
457        def put_handler(env_info: EnvInfo, put_info: Dict[DbId, Dict[RawKeyType, RawValueType]]):
458            try:
459                with env_info.env.begin(write=True) as txn:
460                    for db_id, db_put_info in put_info.items():
461                        if db_id in env_info.databases:
462                            for raw_key, value in db_put_info.items():
463                                txn.put(raw_key, value, db=env_info.databases[db_id], dupdata=False, append=False)
464                        
465                        self.writes_num += len(db_put_info)
466            except lmdb.MapFullError:
467                raise DBError.from_exception(db_id)
468        
469        for env_id, put_info in data_cache_buff.items():
470            if env_id in self.db_environments:
471                envs_need_to_be_sync.add(env_id)
472                lmdb_reapplier(self.db_environments[env_id], put_handler, put_info)
473
474        # TODO: implement delete* methods processing
475        # delete
476        for env_id, kv_deletion_cache_buff_db_info in kv_deletion_cache_buff.items():
477            if env_id in self.db_environments:
478                envs_need_to_be_sync.add(env_id)
479                ...
480
481        for key_info, value in kv_deletion_cache_buff.items():
482            with self.default_db_environment.begin(write=True) as txn:
483                key, db_id = key_info
484                txn.delete(key, value, db=self.databases[db_id])
485                self.deletes_num += 1
486
487        # drop
488        drop_db_requests_buff = self.drop_db_requests
489        self.drop_db_requests = type(drop_db_requests_buff)()
490        
491        def drop_handler(env_info: EnvInfo, drop_info: Dict[DbId, List[bool, Set[CoroID]]]):
492            for db_id, db_drop_info in drop_info.items():
493                delete_db, coro_id = db_drop_info
494                if db_id in env_info.databases:
495                    try:
496                        with env_info.env.begin(write=True) as txn:
497                            txn.drop(db=env_info.databases[db_id], delete=delete_db)
498                            if delete_db:
499                                del env_info.databases[db_id]
500                                del env_info.db_names[db_id]
501                        
502                        self.db_drops_num += 1
503                    except lmdb.MapFullError:
504                        raise DBError.from_exception(db_id)
505                    
506                    self.register_response(coro_id, None, UnknownEnvError(env_id))
507                else:
508                    self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
509        
510        for env_id, drop_info in drop_db_requests_buff.items():
511            if env_id in self.db_environments:
512                envs_need_to_be_sync.add(env_id)
513                lmdb_reapplier(self.db_environments[env_id], drop_handler, drop_info)
514            else:
515                for db_id, db_drop_info in drop_info.items():
516                    delete_db, coro_id = db_drop_info
517                    self.register_response(coro_id, None, UnknownEnvError(env_id))
518
519        # get
520        def get_item(txn, key_info: Tuple[RawKeyType, DbId, EnvId], data_cache_buff: Dict[EnvId, Dict[DbId, Dict[RawKeyType, RawValueType]]]) -> Tuple[ValueType, Optional[Exception]]:
521            key, db_id, env_id = key_info
522            need_to_get_from_db = True
523            try:
524                value = data_cache_buff[env_id][db_id][key]
525                need_to_get_from_db = False
526            except KeyError:
527                pass
528            
529            if need_to_get_from_db:
530                value = txn.get(key, db=self.db_environments[env_id].databases[db_id])
531                self.reads_num += 1
532            
533            exception = None
534            try:
535                if value is None:
536                    exception = DbKeyError(key_info)
537                else:
538                    value = self.serializer.loads(value)
539            except:
540                exception = get_exception()
541            
542            return value, exception
543        
544        # _on_get
545        for env_id, read_queue_buff_db_info in read_queue_buff.items():
546            if env_id in self.db_environments:
547                env_info = self.db_environments[env_id]
548                for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items():
549                    if db_id in env_info.databases:
550                        with env_info.env.begin() as txn:
551                            for raw_key, coro_ids in read_queue_buff_db_key_info.items():
552                                value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff)
553                                for coro_id in coro_ids:
554                                    self.register_response(coro_id, value, exception)
555                    else:
556                        for raw_key, coro_ids in read_queue_buff_db_key_info.items():
557                            for coro_id in coro_ids:
558                                self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
559            else:
560                for db_id, read_queue_buff_db_key_info in read_queue_buff_db_info.items():
561                    for raw_key, coro_ids in read_queue_buff_db_key_info.items():
562                        for coro_id in coro_ids:
563                            self.register_response(coro_id, None, UnknownEnvError(env_id))
564        
565        # _on_get_items
566        results: Dict[CoroID, Dict[DbId, Dict[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]]] = dict()
567        for env_id, massive_read_queue_buff_coro_info in massive_read_queue_buff.items():
568            if env_id in self.db_environments:
569                env_info = self.db_environments[env_id]
570                for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items():
571                    if coro_id not in results:
572                        results[coro_id] = dict()
573                    
574                    coro_results = results[coro_id]
575                    for db_id, raw_keys in read_queue_buff_db_info.items():
576                        if db_id not in coro_results:
577                            coro_results[db_id] = dict()
578                        
579                        coro_db_results = coro_results[db_id]
580                        if db_id in env_info.databases:
581                            with env_info.env.begin() as txn:
582                                for raw_key in raw_keys:
583                                    value, exception = get_item(txn, (raw_key, db_id, env_id), data_cache_buff)
584                                    coro_db_results[normalize_compound_key(raw_key)] = (value, exception)
585                        else:
586                            for coro_id in coro_ids:
587                                self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
588            else:
589                for coro_id, read_queue_buff_db_info in massive_read_queue_buff_coro_info.items():
590                    for db_id, raw_keys in read_queue_buff_db_info.items():
591                        for coro_id in coro_ids:
592                            self.register_response(coro_id, None, UnknownEnvError(env_id))
593        
594        for coro_id, coro_results in results.items():
595            self.register_response(coro_id, coro_results, None)
596
597        # get all items
598        for coro_id, db_id, env_id in get_all_items_queue_buff:
599            if env_id in self.db_environments:
600                env_info = self.db_environments[env_id]
601                env = env_info.env
602                if db_id in env_info.databases:
603                    db = env_info.databases[db_id]
604                    with env.begin(db=db) as txn:
605                        result = list()
606                        exception = None
607                        try:
608                            result = [(normalize_compound_key(self.serializer.loads(k)), self.serializer.loads(v)) for k, v in txn.cursor()]
609                            self.reads_num += len(result)
610                        except:
611                            exception = get_exception()
612                        
613                        self.register_response(coro_id, result, exception)
614                else:
615                    self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
616            else:
617                self.register_response(coro_id, None, UnknownEnvError(env_id))
618        
619        # get_first
620        for env_id, get_first_queue_buff_db_info in get_first_queue_buff.items():
621            if env_id in self.db_environments:
622                env_info = self.db_environments[env_id]
623                for db_id, coro_ids in get_first_queue_buff_db_info.items():
624                    if db_id in env_info.databases:
625                        db = env_info.databases[db_id]
626                        with env_info.env.begin(db=db) as txn:
627                            result = None
628                            exception = None
629                            try:
630                                cursor: lmdb.Cursor = txn.cursor()
631                                if cursor.first():
632                                    result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value()))
633                                    self.reads_num += 1
634                                else:
635                                    exception = KeyError()
636                            except:
637                                exception = get_exception()
638                            
639                            for coro_id in coro_ids:
640                                self.register_response(coro_id, result, exception)
641                    else:
642                        for coro_id in coro_ids:
643                            self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
644            else:
645                for db_id, coro_ids in get_first_queue_buff_db_info.items():
646                    for coro_id in coro_ids:
647                        self.register_response(coro_id, None, UnknownEnvError(env_id))
648        
649        # get_last
650        for env_id, get_last_queue_buff_db_info in get_last_queue_buff.items():
651            if env_id in self.db_environments:
652                env_info = self.db_environments[env_id]
653                for db_id, coro_ids in get_last_queue_buff_db_info.items():
654                    if db_id in env_info.databases:
655                        db = env_info.databases[db_id]
656                        with env_info.env.begin(db=db) as txn:
657                            result = None
658                            exception = None
659                            try:
660                                cursor: lmdb.Cursor = txn.cursor()
661                                if cursor.first():
662                                    result = (normalize_compound_key(self.serializer.loads(cursor.key())), self.serializer.loads(cursor.value()))
663                                    self.reads_num += 1
664                                else:
665                                    exception = KeyError()
666                            except:
667                                exception = get_exception()
668                            
669                            for coro_id in coro_ids:
670                                self.register_response(coro_id, result, exception)
671                    else:
672                        for coro_id in coro_ids:
673                            self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
674            else:
675                for db_id, coro_ids in get_last_queue_buff_db_info.items():
676                    for coro_id in coro_ids:
677                        self.register_response(coro_id, None, UnknownEnvError(env_id))
678        
679        # get_n_items
680        for env_id, get_n_items_queue_buff_coro_info in get_n_items_queue_buff.items():
681            if env_id in self.db_environments:
682                env_info = self.db_environments[env_id]
683                for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items():
684                    coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list()
685                    db_id, first_desired_raw_key, num_items, reverse = read_queue_buff_db_info
686                    if db_id in env_info.databases:
687                        db = env_info.databases[db_id]
688                        with env_info.env.begin(db=db) as txn:
689                            result = None
690                            exception = None
691                            try:
692                                cursor: lmdb.Cursor = txn.cursor()
693                                if cursor.set_range(first_desired_raw_key):
694                                    if reverse:
695                                        cursor_iterator = cursor.iterprev(keys=True, values=True)
696                                    else:
697                                        cursor_iterator = cursor.iternext(keys=True, values=True)
698                                    
699                                    for raw_key, raw_value in cursor_iterator:
700                                        if (num_items is not None) and (num_items <= 0):
701                                            break
702                                        
703                                        coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value)))
704                                        self.reads_num += 1
705                                        if num_items is not None:
706                                            num_items -= 1
707                                else:
708                                    exception = KeyError()
709                            except:
710                                exception = get_exception()
711                            
712                            self.register_response(coro_id, result, exception)
713                    else:
714                        self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
715            else:
716                for coro_id, read_queue_buff_db_info in get_n_items_queue_buff_coro_info.items():
717                    for coro_id in coro_ids:
718                        self.register_response(coro_id, None, UnknownEnvError(env_id))
719        
720        # get_items_range
721        for env_id, get_items_range_queue_buff_coro_info in get_items_range_queue_buff.items():
722            if env_id in self.db_environments:
723                env_info = self.db_environments[env_id]
724                for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items():
725                    coro_results: List[Tuple[NormalizedKeyType, ValueType]] = list()
726                    db_id, first_desired_raw_key, last_desired_raw_key, num_items, reverse = read_queue_buff_db_info
727                    if db_id in env_info.databases:
728                        db = env_info.databases[db_id]
729                        with env_info.env.begin(db=db) as txn:
730                            result = None
731                            exception = None
732                            try:
733                                cursor: lmdb.Cursor = txn.cursor()
734                                if cursor.set_range(first_desired_raw_key):
735                                    if reverse:
736                                        cursor_iterator = cursor.iterprev(keys=True, values=True)
737                                    else:
738                                        cursor_iterator = cursor.iternext(keys=True, values=True)
739                                    
740                                    for raw_key, raw_value in cursor_iterator:
741                                        if reverse:
742                                            if raw_key < last_desired_raw_key:
743                                                break
744                                        else:
745                                            if raw_key > last_desired_raw_key:
746                                                break
747
748                                        if (num_items is not None) and (num_items <= 0):
749                                            break
750
751                                        coro_results.append((normalize_compound_key(self.serializer.loads(raw_key)), self.serializer.loads(raw_value)))
752                                        self.reads_num += 1
753                                        if num_items is not None:
754                                            num_items -= 1
755                                else:
756                                    exception = KeyError()
757                            except:
758                                exception = get_exception()
759                            
760                            self.register_response(coro_id, result, exception)
761                    else:
762                        self.register_response(coro_id, None, UnknownEnvDBError(env_id, db_id))
763            else:
764                for coro_id, read_queue_buff_db_info in get_items_range_queue_buff_coro_info.items():
765                    for coro_id in coro_ids:
766                        self.register_response(coro_id, None, UnknownEnvError(env_id))
767        
768        # sync
769        if envs_need_to_be_sync:
770            for env_id in envs_need_to_be_sync:
771                self.sync_in_thread_pool(env_id)
772            
773            self.last_sync_time = perf_counter()
774
775        self.make_dead()
def in_work(self) -> bool:
777    def in_work(self) -> bool:
778        result: bool = (bool(self.get_first_queue) or bool(self.get_last_queue) or bool(self.get_n_items_queue) or bool(self.get_items_range_queue) or bool(self.read_queue) or bool(self.massive_read_queue) or bool(self.get_all_items_queue)) or (bool(self.force_sync) or ((not bool(self.write_locked)) and (bool(self.data_cache) or bool(self.kv_deletion_cache) or bool(self.drop_db_requests)) and ((perf_counter() - self.last_sync_time) >= self.sync_time_interval)))
779        return self.thrifty_in_work(result)

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

def time_left_before_next_event(self) -> Tuple[bool, Union[int, float, NoneType]]:
781    def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]:
782        time_since_last_sync_time: float = perf_counter() - self.last_sync_time
783        if self.sync_time_interval > time_since_last_sync_time:
784            return True, self.sync_time_interval - time_since_last_sync_time
785        else:
786            return True, 0
def sync_in_thread_pool(self, env_id: Hashable = None):
1175    def sync_in_thread_pool(self, env_id: EnvId = None):
1176        async def sync_db_coro(i: Interface, self: 'Db', env_id: EnvId, asyncio_loop, need_to_ensure_asyncio_loop: bool):
1177            if need_to_ensure_asyncio_loop:
1178                asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None,CoroPriority.low, True))
1179            else:
1180                if asyncio_loop is None:
1181                    asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest.get())
1182            
1183            async def sync_db(self: 'Db', asyncio_loop, env: lmdb.Environment):
1184                def sync_worker():
1185                    env.sync(True)
1186                    self.write_locked.discard(env_id)
1187                
1188                await task_in_thread_pool(asyncio_loop, sync_worker)
1189
1190            env: lmdb.Environment = self.db_environments[env_id].env
1191            await i(AsyncioLoop, AsyncioLoopRequest.wait(sync_db(self, asyncio_loop, env)))
1192            self.write_locked_coro_id.discard(i.coro_id)
1193            def make_service_live_for_a_next_sync(self: 'Db'):
1194                self.make_live()
1195            
1196            await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self)
1197
1198        asyncio_loop = None
1199        need_to_ensure_asyncio_loop = False
1200        try:
1201            asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get()
1202        except AsyncioLoopWasNotSetError:
1203            need_to_ensure_asyncio_loop = True
1204
1205        coro: CoroWrapperBase = put_root_from_other_service(self, sync_db_coro, env_id, self, asyncio_loop, need_to_ensure_asyncio_loop)
1206        coro.is_background_coro = True
1207        self.write_locked.add(env_id)
1208        self.write_locked_coro_id.add(coro.coro_id)
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
in_forground_work
thrifty_in_work
is_low_latency
make_live
make_dead
service_id_impl
service_id
destroy
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
StatsLevel
class DbRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
208class DbRequest(ServiceRequest):
209    def __init__(self, env_id: EnvId = None, db_id: DbId = None):
210        super().__init__()
211        self.env_id: EnvId = env_id
212        self.db_id: DbId = db_id
213        self.provide_to_request_handler = True
214    
215    def _copy(self) -> 'DbRequest':
216        return DbRequest(self.env_id, self.db_id)
217    
218    def _get_db_id(self, db_id: DbId) -> DbId:
219        if self.db_id is None:
220            return db_id
221        else:
222            return self.db_id
223    
224    def set_default_db_environment_path(self, path_to_db_environment: str) -> bool:
225        return self._save_to_copy(0, path_to_db_environment)
226    
227    def open_databases(self, db_names: Dict[DbId, DbName]) -> None:
228        return self._save_to_copy(1, db_names)
229    
230    def drop_db(self, db_id: DbId, delete: bool = False) -> None:
231        return self._save_to_copy(2, db_id, delete)
232    
233    def sync(self) -> None:
234        return self._save_to_copy(3)
235    
236    def get(self, key: InputKeyType, db_id: DbId = None) -> ValueType:
237        return self._save_to_copy(4, key, db_id)
238    
239    def get_first(self, db_id: DbId = None) -> Dict[NormalizedKeyType, ValueType]:
240        # Returns first item in DB
241        return self._save_to_copy(14, db_id)
242    
243    def get_last(self, db_id: DbId = None) -> Dict[NormalizedKeyType, ValueType]:
244        # Returns last item in DB
245        return self._save_to_copy(15, db_id)
246    
247    def get_items(self, db_keys: Sequence[InputKeyType], db_id: DbId = None) -> List[Tuple[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]]:
248        return self._save_to_copy(5, db_keys, db_id)
249    
250    def get_n_items(self, desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]:
251        return self._save_to_copy(16, desired_key, num, db_id, reverse=False)
252    
253    def get_reverse_n_items(self, desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]:
254        return self._save_to_copy(16, desired_key, num, db_id, reverse=True)
255    
256    def get_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]:
257        return self._save_to_copy(17, first_desired_key, last_desired_key, num, db_id, reverse=False)
258    
259    def get_reverse_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]:
260        return self._save_to_copy(17, first_desired_key, last_desired_key, num, db_id, reverse=True)
261    
262    def get_all_items(self, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]:
263        # Returns all items from DB
264        return self._save_to_copy(6, db_id)
265    
266    def put(self, key: InputKeyType, value: Optional[ValueType] = None, db_id: DbId = None) -> RawValueType:
267        return self._save_to_copy(7, key, value, db_id)
268    
269    def put_items(self, db_items: Dict[InputKeyType, ValueType], db_id: DbId = None) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]:
270        return self._save_to_copy(8, db_items, db_id)
271    
272    def delete(self, key: InputKeyType, db_id: DbId = None) -> RawValueType:
273        return self._save_to_copy(9, key, db_id)
274    
275    def delete_kv(self, key: InputKeyType, value: ValueType, db_id: DbId = None) -> RawValueType:
276        return self._save_to_copy(10, key, value, db_id)
277    
278    def delete_items(self, db_items: Set[InputKeyType], db_id: DbId = None) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]:
279        return self._save_to_copy(11, db_items, db_id)
280    
281    def delete_kv_items(self, db_items: Dict[InputKeyType, Tuple[ValueType]], db_id: DbId = None) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]:
282        return self._save_to_copy(12, db_items, db_id)
283
284    def open_db_environment(self, path_to_db_environment: str) -> EnvId:
285        return self._save_to_copy(13, path_to_db_environment)
286    
287    def lock_databases(self, db_names: Optional[Set[DbId]] = None) -> None:
288        # Lock all databases if db_names is None. Databases will be released automatically wnen coroutine execution will be finished
289        return self._save_to_copy(18, db_names)
290    
291    def try_lock_databases(self, db_names: Optional[Set[DbId]] = None) -> bool:
292        # Tries to lock all databases if db_names is None. Returns True if try was successfull. False otherwise. Databases will be released automatically wnen coroutine execution will be finished
293        return self._save_to_copy(19, db_names)
294    
295    def unlock_databases(self, db_names: Optional[Set[DbId]] = None) -> None:
296        # Unlock all databases if db_names is None
297        return self._save_to_copy(18, db_names)
DbRequest(env_id: Hashable = None, db_id: Hashable = None)
209    def __init__(self, env_id: EnvId = None, db_id: DbId = None):
210        super().__init__()
211        self.env_id: EnvId = env_id
212        self.db_id: DbId = db_id
213        self.provide_to_request_handler = True
env_id: Hashable
db_id: Hashable
provide_to_request_handler
def set_default_db_environment_path(self, path_to_db_environment: str) -> bool:
224    def set_default_db_environment_path(self, path_to_db_environment: str) -> bool:
225        return self._save_to_copy(0, path_to_db_environment)
def open_databases(self, db_names: Dict[Hashable, bytes]) -> None:
227    def open_databases(self, db_names: Dict[DbId, DbName]) -> None:
228        return self._save_to_copy(1, db_names)
def drop_db(self, db_id: Hashable, delete: bool = False) -> None:
230    def drop_db(self, db_id: DbId, delete: bool = False) -> None:
231        return self._save_to_copy(2, db_id, delete)
def sync(self) -> None:
233    def sync(self) -> None:
234        return self._save_to_copy(3)
def get( self, key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], db_id: Hashable = None) -> Any:
236    def get(self, key: InputKeyType, db_id: DbId = None) -> ValueType:
237        return self._save_to_copy(4, key, db_id)
def get_first( self, db_id: Hashable = None) -> dict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Any]:
239    def get_first(self, db_id: DbId = None) -> Dict[NormalizedKeyType, ValueType]:
240        # Returns first item in DB
241        return self._save_to_copy(14, db_id)
def get_last( self, db_id: Hashable = None) -> dict[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Any]:
243    def get_last(self, db_id: DbId = None) -> Dict[NormalizedKeyType, ValueType]:
244        # Returns last item in DB
245        return self._save_to_copy(15, db_id)
def get_items( self, db_keys: collections.abc.Sequence[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]], db_id: Hashable = None) -> list[tuple[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Tuple[typing.Any, typing.Union[Exception, NoneType]]]]:
247    def get_items(self, db_keys: Sequence[InputKeyType], db_id: DbId = None) -> List[Tuple[NormalizedKeyType, Tuple[ValueType, Optional[Exception]]]]:
248        return self._save_to_copy(5, db_keys, db_id)
def get_n_items( self, desired_key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], num: Union[int, NoneType] = None, db_id: Hashable = None) -> list[tuple[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Any]]:
250    def get_n_items(self, desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]:
251        return self._save_to_copy(16, desired_key, num, db_id, reverse=False)
def get_reverse_n_items( self, desired_key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], num: Union[int, NoneType] = None, db_id: Hashable = None) -> list[tuple[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Any]]:
253    def get_reverse_n_items(self, desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]:
254        return self._save_to_copy(16, desired_key, num, db_id, reverse=True)
def get_items_range( self, first_desired_key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], last_desired_key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], num: Union[int, NoneType] = None, db_id: Hashable = None) -> list[tuple[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Any]]:
256    def get_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]:
257        return self._save_to_copy(17, first_desired_key, last_desired_key, num, db_id, reverse=False)
def get_reverse_items_range( self, first_desired_key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], last_desired_key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], num: Union[int, NoneType] = None, db_id: Hashable = None) -> list[tuple[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Any]]:
259    def get_reverse_items_range(self, first_desired_key: InputKeyType, last_desired_key: InputKeyType, num: Optional[int] = None, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]:
260        return self._save_to_copy(17, first_desired_key, last_desired_key, num, db_id, reverse=True)
def get_all_items( self, db_id: Hashable = None) -> list[tuple[typing.Union[bytes, str, typing.Tuple[typing.Union[bytes, str]], tuple[typing.Union[typing.Tuple[typing.Union[bytes, str]], typing.Tuple[ForwardRef('NormalizedCompoundKeyType')]]]], typing.Any]]:
262    def get_all_items(self, db_id: DbId = None) -> List[Tuple[NormalizedKeyType, ValueType]]:
263        # Returns all items from DB
264        return self._save_to_copy(6, db_id)
def put( self, key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], value: Union[Any, NoneType] = None, db_id: Hashable = None) -> bytes:
266    def put(self, key: InputKeyType, value: Optional[ValueType] = None, db_id: DbId = None) -> RawValueType:
267        return self._save_to_copy(7, key, value, db_id)
def put_items( self, db_items: dict[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Any], db_id: Hashable = None) -> Dict[Hashable, Dict[bytes, Tuple[bytes, Union[Exception, NoneType]]]]:
269    def put_items(self, db_items: Dict[InputKeyType, ValueType], db_id: DbId = None) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]:
270        return self._save_to_copy(8, db_items, db_id)
def delete( self, key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], db_id: Hashable = None) -> bytes:
272    def delete(self, key: InputKeyType, db_id: DbId = None) -> RawValueType:
273        return self._save_to_copy(9, key, db_id)
def delete_kv( self, key: Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], list[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], set[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]], tuple[Union[List[Union[bytes, str]], Set[Union[bytes, str]], Tuple[Union[bytes, str]], List[ForwardRef('CompoundKeyType')], Set[ForwardRef('CompoundKeyType')], Tuple[ForwardRef('CompoundKeyType')]]]], value: Any, db_id: Hashable = None) -> bytes:
275    def delete_kv(self, key: InputKeyType, value: ValueType, db_id: DbId = None) -> RawValueType:
276        return self._save_to_copy(10, key, value, db_id)
def delete_items( self, db_items: set[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]]], db_id: Hashable = None) -> Dict[Hashable, Dict[bytes, Tuple[bytes, Union[Exception, NoneType]]]]:
278    def delete_items(self, db_items: Set[InputKeyType], db_id: DbId = None) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]:
279        return self._save_to_copy(11, db_items, db_id)
def delete_kv_items( self, db_items: dict[typing.Union[cengal.parallel_execution.coroutines.coro_standard_services.db.versions.v_0.db.NormalizedCompoundKey, bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Tuple[typing.Any]], db_id: Hashable = None) -> Dict[Hashable, Dict[bytes, Tuple[bytes, Union[Exception, NoneType]]]]:
281    def delete_kv_items(self, db_items: Dict[InputKeyType, Tuple[ValueType]], db_id: DbId = None) -> Dict[DbId, Dict[RawKeyType, Tuple[RawValueType, Optional[Exception]]]]:
282        return self._save_to_copy(12, db_items, db_id)
def open_db_environment(self, path_to_db_environment: str) -> Hashable:
284    def open_db_environment(self, path_to_db_environment: str) -> EnvId:
285        return self._save_to_copy(13, path_to_db_environment)
def lock_databases(self, db_names: Union[Set[Hashable], NoneType] = None) -> None:
287    def lock_databases(self, db_names: Optional[Set[DbId]] = None) -> None:
288        # Lock all databases if db_names is None. Databases will be released automatically wnen coroutine execution will be finished
289        return self._save_to_copy(18, db_names)
def try_lock_databases(self, db_names: Union[Set[Hashable], NoneType] = None) -> bool:
291    def try_lock_databases(self, db_names: Optional[Set[DbId]] = None) -> bool:
292        # Tries to lock all databases if db_names is None. Returns True if try was successfull. False otherwise. Databases will be released automatically wnen coroutine execution will be finished
293        return self._save_to_copy(19, db_names)
def unlock_databases(self, db_names: Union[Set[Hashable], NoneType] = None) -> None:
295    def unlock_databases(self, db_names: Optional[Set[DbId]] = None) -> None:
296        # Unlock all databases if db_names is None
297        return self._save_to_copy(18, db_names)
default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] = <class 'Db'>
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
default__request__type__
request_type
args
kwargs
interface
i
async_interface
ai
KeyType = typing.Union[bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]
RawKeyType = <class 'bytes'>
ValueType = typing.Any
RawValueType = <class 'bytes'>
DbId = typing.Hashable
DbName = <class 'bytes'>
class DbKeyError(builtins.KeyError):
123class DbKeyError(KeyError):
124    def __init__(self, key_info: Tuple[KeyType, DbId], *args: object) -> None:
125        super().__init__(*args)
126        self.key_info: Tuple[KeyType, DbId] = key_info

Mapping key not found.

DbKeyError( key_info: tuple[typing.Union[bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Hashable], *args: object)
124    def __init__(self, key_info: Tuple[KeyType, DbId], *args: object) -> None:
125        super().__init__(*args)
126        self.key_info: Tuple[KeyType, DbId] = key_info
key_info: tuple[typing.Union[bytes, str, typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], list[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], set[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]], tuple[typing.Union[typing.List[typing.Union[bytes, str]], typing.Set[typing.Union[bytes, str]], typing.Tuple[typing.Union[bytes, str]], typing.List[ForwardRef('CompoundKeyType')], typing.Set[ForwardRef('CompoundKeyType')], typing.Tuple[ForwardRef('CompoundKeyType')]]]], typing.Hashable]
Inherited Members
builtins.BaseException
with_traceback
args