cengal.parallel_execution.coroutines.coro_standard_services.lmdb.versions.v_0.lmdb

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19__all__ = ['Lmdb', 'LmdbRequest', 'KeyType', 'RawKeyType', 'ValueType', 'RawValueType', 'DbId', 'DbName', 'DbKeyError']
 20
 21from cengal.parallel_execution.coroutines.coro_scheduler import *
 22from cengal.parallel_execution.coroutines.coro_tools.await_coro import *
 23from cengal.parallel_execution.coroutines.coro_standard_services.asyncio_loop import *
 24from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import CoroPriority
 25from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import *
 26from cengal.parallel_execution.coroutines.coro_standard_services.timer_func_runner import *
 27from cengal.file_system.file_manager import path_relative_to_current_dir
 28from cengal.time_management.cpu_clock_cycles import perf_counter
 29from cengal.data_manipulation.serialization import *
 30from cengal.introspection.inspect import get_exception
 31from typing import Hashable, Tuple, List, Any, Dict, Callable
 32import sys
 33import os
 34import asyncio
 35try:
 36    import lmdb as lmdb_lib
 37except ImportError:
 38    from warnings import warn
 39    warn('''WARNING: `lmdb` library is not installed. Lmdb service will not work.
 40         To install `lmdb` use: `pip install lmdb`''')
 41    raise
 42
 43
 44"""
 45Module Docstring
 46Docstrings: http://www.python.org/dev/peps/pep-0257/
 47"""
 48
 49__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 50__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 51__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 52__license__ = "Apache License, Version 2.0"
 53__version__ = "4.4.1"
 54__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 55__email__ = "gtalk@butenkoms.space"
 56# __status__ = "Prototype"
 57__status__ = "Development"
 58# __status__ = "Production"
 59
 60
 61KeyType = Union[bytes, str, Any]
 62RawKeyType = bytes
 63ValueType = Any
 64RawValueType = bytes
 65DbId = Hashable
 66DbName = bytes
 67
 68
 69class LmdbRequest(ServiceRequest):
 70    def set_db_environment_path(self, path_to_db_environment: str) -> ServiceRequest:
 71        return self._save(0, path_to_db_environment)
 72    
 73    def open_databases(self, db_names: Dict[DbId, DbName]) -> ServiceRequest:
 74        return self._save(1, db_names)
 75    
 76    def drop_db(self, db_id: DbId, delete: bool = False) -> ServiceRequest:
 77        return self._save(2, db_id, delete)
 78    
 79    def sync(self) -> ServiceRequest:
 80        return self._save(3)
 81    
 82    def get(self, key: KeyType, db_id: DbId = None) -> ServiceRequest:
 83        return self._save(4, key, db_id)
 84    
 85    def get_items(self, keys: Set[Tuple[KeyType, DbId]]) -> ServiceRequest:
 86        return self._save(5, keys)
 87    
 88    def get_all_items(self, db_id: DbId = None) -> ServiceRequest:
 89        return self._save(6, db_id)
 90    
 91    def put(self, key: KeyType, value: Optional[ValueType] = None, db_id: DbId = None) -> ServiceRequest:
 92        return self._save(7, key, value, db_id)
 93    
 94    def put_items(self, items: Dict[Tuple[KeyType, DbId], Optional[ValueType]]) -> ServiceRequest:
 95        return self._save(8, items)
 96    
 97    def delete(self, key: KeyType, value: Optional[ValueType] = None, db_id: DbId = None) -> ServiceRequest:
 98        return self._save(9, key, value, db_id)
 99    
100    def delete_items(self, items: Dict[Tuple[KeyType, DbId], Optional[ValueType]]) -> ServiceRequest:
101        return self._save(10, items)
102
103    def open_db_environment(self, path_to_db_environment: str) -> ServiceRequest:
104        return self._save(11, path_to_db_environment)
105    
106
107def make_key_frozen(key):
108    if isinstance(key, list) or isinstance(key, set):
109        new_key = list()
110        for item in key:
111            new_key.append(make_key_frozen(item))
112        
113        key = tuple(new_key)
114
115    return key
116    
117
118class Lmdb(Service, EntityStatsMixin):
119    def __init__(self, loop: CoroSchedulerType):
120        super(Lmdb, self).__init__(loop)
121        self.default_db_name: DbName = b'__default__'
122        self.drop_db_requests: Dict[CoroID, Tuple[Hashable, bool]] = dict()
123        self.read_queue: List[Tuple[CoroID, Tuple[RawKeyType, DbId]]] = list()
124        self.massive_read_queue: List[Tuple[CoroID, Set[Tuple[KeyType, DbId]]]] = list()
125        self.data_cache: Dict[Tuple[Hashable, Hashable], Any] = dict()
126        self.deletion_cache: Dict[Tuple[Hashable, Hashable], Any] = dict()
127        self.get_all_items_queue: List[Tuple[CoroID, DbId]] = list()
128        self.path_to_db_environment = path_relative_to_current_dir('lmdb.db')
129        self.db_environment = None
130        self.databases: Dict[Hashable, Any] = dict()
131        self.db_names: Dict[DbId, DbName] = dict()
132        self.async_loop = None
133        self.sync_time_interval = 1.0
134        self.last_sync_time = perf_counter()
135        self.force_sync = False
136        self.write_locked = False
137        self.writes_num: int = 0
138        self.reads_num: int = 0
139        self.deletes_num: int = 0
140        self.db_drops_num: int = 0
141        self.write_locked_coro_id: Optional[CoroID] = None
142        # self.serializer = best_serializer_for_standard_data((DataFormats.binary,
143        #                                    Tags.can_use_bytes,
144        #                                    Tags.decode_str_as_str,
145        #                                    Tags.decode_list_as_list,
146        #                                    Tags.decode_bytes_as_bytes,
147        #                                    Tags.superficial,
148        #                                    Tags.current_platform,
149        #                                    Tags.multi_platform),
150        #                                   TestDataType.small,
151        #                                   0.1)
152        self.serializer = Serializer(Serializers.msgspec_messagepack)
153
154        self._request_workers = {
155            0: self._on_set_db_environment_path,
156            1: self._open_databases,
157            2: self._drop_db,
158            3: self._on_sync,
159            4: self._on_get,
160            5: self._on_get_items,
161            6: self._on_get_all_items,
162            7: self._on_put,
163            8: self._on_put_items,
164            9: self._on_delete,
165            10: self._on_delete_items,
166            11: self._open_db_environment,
167        }
168
169    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
170        return type(self).__name__, {
171            'db_names': list(self.db_names.keys()),
172            'writes num': self.writes_num,
173            'reads num': self.reads_num,
174            'deletes num': self.deletes_num,
175            'db drops num': self.db_drops_num,
176        }
177
178    def single_task_registration_or_immediate_processing(
179            self, *args, **kwargs) -> ServiceProcessingResponse:
180        result = self.try_resolve_request(*args, **kwargs)
181        if result is None:
182            return True, None, None
183        else:
184            return result
185
186    def full_processing_iteration(self):
187        self.force_sync = False
188        
189        if self.db_environment is None:
190            self._init_db()
191
192        data_cache_buff = self.data_cache
193        self.data_cache = type(data_cache_buff)()
194        
195        read_queue_buff = self.read_queue
196        self.read_queue = type(read_queue_buff)()
197        
198        massive_read_queue_buff = self.massive_read_queue
199        self.massive_read_queue = type(massive_read_queue_buff)()
200        
201        deletion_cache_buff = self.deletion_cache
202        self.deletion_cache = type(deletion_cache_buff)()
203        
204        get_all_items_queue_buff = self.get_all_items_queue
205        self.get_all_items_queue = type(get_all_items_queue_buff)()
206
207        # put
208        def put_handler(db_environment, databases):
209            db_id = None
210            try:
211                with db_environment.begin(write=True) as txn:
212                    for key_info, value in data_cache_buff.items():
213                        key, db_id = key_info
214                        txn.put(key, value, db=databases[db_id], dupdata=False, append=False)
215
216                self.writes_num += len(data_cache_buff)
217            except lmdb_lib.MapFullError:
218                raise DBError.from_exception(db_id)
219        
220        lmdb_reapplier(self.db_environment, self.databases, put_handler)
221        
222        # delete
223        for key_info, value in deletion_cache_buff.items():
224            with self.db_environment.begin(write=True) as txn:
225                key, db_id = key_info
226                txn.delete(key, value, db=self.databases[db_id])
227                self.deletes_num += 1
228
229        # drop
230        drop_db_requests_buff = self.drop_db_requests
231        self.drop_db_requests = type(drop_db_requests_buff)()
232        dropped_databases: Set[Hashable] = set()
233        processed_coroutines: Set[CoroID] = set()
234        
235        def drop_handler(db_environment, databases):
236            for coro_id, request in drop_db_requests_buff.items():
237                if coro_id in processed_coroutines:
238                    continue
239                
240                db_id, delete_db = request
241                if db_id not in dropped_databases:
242                    try:
243                        with db_environment.begin(write=True) as txn:
244                            txn.drop(db=databases[db_id], delete=delete_db)
245                        
246                        self.db_drops_num += 1
247                    except lmdb_lib.MapFullError:
248                        raise DBError.from_exception(db_id)
249                    
250                    dropped_databases.add(db_id)
251                
252                self.register_response(coro_id, None, None)
253                processed_coroutines.add(coro_id)
254                    
255        lmdb_reapplier(self.db_environment, self.databases, drop_handler)
256
257        # get
258        def get_item(txn, key_info, data_cache_buff) -> Tuple[ValueType, Optional[Exception]]:
259            key, db_id = key_info
260            if key_info in data_cache_buff:
261                value = data_cache_buff[key_info]
262            else:
263                value = txn.get(key, db=self.databases[db_id])
264                self.reads_num += 1
265            
266            exception = None
267            try:
268                if value is None:
269                    exception = DbKeyError(key_info)
270                else:
271                    value = self.serializer.loads(value)
272            except:
273                exception = get_exception()
274            
275            return value, exception
276            
277        with self.db_environment.begin() as txn:
278            for coro_id, key_info in read_queue_buff:
279                value, exception = get_item(txn, key_info, data_cache_buff)
280                self.register_response(coro_id, value, exception)
281            
282            for coro_id, set_of_key_info in massive_read_queue_buff:
283                items: Dict[Tuple[KeyType, DbId], Tuple[ValueType, Optional[Exception]]] = dict()
284                for key_info in set_of_key_info:
285                    items[key_info] = get_item(txn, key_info, data_cache_buff)
286                    self.register_response(coro_id, items, None)
287
288        # get all items
289        for coro_id, db_id in get_all_items_queue_buff:
290            with self.db_environment.begin(db=self.databases[db_id]) as txn:
291                result = dict()
292                exception = None
293                try:
294                    # for k, v in txn.cursor(db=self.databases[db_id]):
295                    #     key = make_key_frozen(self.serializer.loads(k))
296                    #     value = self.serializer.loads(v)
297                    #     result[key] = value
298                    result = {make_key_frozen(self.serializer.loads(k)): self.serializer.loads(v) for k, v in txn.cursor(db=self.databases[db_id])}
299                    self.reads_num += len(result)
300                except:
301                    exception = get_exception()
302                
303                self.register_response(coro_id, result, exception)
304        
305        # sync
306        self.sync_in_thread_pool()
307        
308        self.last_sync_time = perf_counter()
309
310        self.make_dead()
311
312    def in_work(self) -> bool:
313        result: bool = (bool(self.read_queue) or bool(self.massive_read_queue) or bool(self.get_all_items_queue)) or (self.force_sync or ((not bool(self.write_locked)) and (bool(self.data_cache) or bool(self.deletion_cache) or bool(self.drop_db_requests)) and ((perf_counter() - self.last_sync_time) >= self.sync_time_interval)))
314        return self.thrifty_in_work(result)
315    
316    def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]:
317        time_since_last_sync_time: float = perf_counter() - self.last_sync_time
318        if self.sync_time_interval > time_since_last_sync_time:
319            return True, self.sync_time_interval - time_since_last_sync_time
320        else:
321            return True, 0
322
323    def _init_db(self):
324        print(f'self.path_to_db_environment: {self.path_to_db_environment}')
325        self.db_environment = lmdb_lib.open(self.path_to_db_environment, map_size=20 * 1024**2, writemap=True, max_dbs=10,
326                                        map_async=True, lock=False, metasync=False, sync=False, meminit=False)
327        self.databases[None] = self.db_environment.open_db(self.default_db_name)
328        self.db_environment.sync(True)
329    
330    def _open_db_environment(self, path_to_db_environment: str) -> ServiceProcessingResponse:
331        raise NotImplementedError
332
333    def _on_set_db_environment_path(self, path_to_db_environment: str) -> ServiceProcessingResponse:
334        if self.write_locked:
335            return True, False, None
336        
337        if self.db_environment is None:
338            self.path_to_db_environment = path_to_db_environment
339            try:
340                self._init_db()
341            except:
342                exception = get_exception()
343                return True, False, exception
344            return True, True, None
345        else:
346            return True, False, None
347    
348    def _on_sync(self) -> ServiceProcessingResponse:
349        if self.data_cache:
350            self.force_sync = True
351            self.make_live()
352        else:
353            # self.db_environment.sync(True)
354            self.sync_in_thread_pool()
355        
356        return True, None, None
357    
358    def _on_get(self, key: KeyType, db_id: DbId = None) -> ServiceProcessingResponse:
359        key = self.serializer.dumps(key)
360        
361        key_info = (key, db_id)
362        if key_info in self.data_cache:
363            return True, self.data_cache[key_info], None
364        else:
365            self.read_queue.append((self.current_caller_coro_info.coro_id, key_info))
366            self.make_live()
367            return False, None, None
368    
369    def _on_get_items(self, keys: Set[Tuple[KeyType, DbId]]) -> ServiceProcessingResponse:
370        coro_id = self.current_caller_coro_info.coro_id
371        
372        raw_keys: Set[Tuple[KeyType, DbId]] = set()
373        for key, db_id in keys:
374            key = self.serializer.dumps(key)
375            
376            raw_keys.add((key, db_id))
377        
378        self.massive_read_queue.append((coro_id, raw_keys))
379        self.make_live()
380        return False, None, None
381    
382    def _on_get_all_items(self, db_id: DbId) -> ServiceProcessingResponse:
383        coro_id = self.current_caller_coro_info.coro_id
384        self.get_all_items_queue.append((coro_id, db_id))
385        self.make_live()
386        return False, None, None
387    
388    def _on_put(self, key: KeyType, value: Any, db_id: DbId = None) -> ServiceProcessingResponse:
389        key = self.serializer.dumps(key)
390        
391        key_info = (key, db_id)
392        exception = None
393        result = None
394        try:
395            result = self.data_cache[key_info] = self.serializer.dumps(value)
396        except:
397            exception = get_exception()
398        
399        self.make_live()
400        return True, result, exception
401    
402    def _on_put_items(self, items: Dict[Tuple[KeyType, DbId], ValueType]) -> ServiceProcessingResponse:
403        result_items: Dict[Tuple[KeyType, DbId], Tuple[ValueType, Optional[Exception]]] = dict()
404        for key_info, value in items.items():
405            key, db_id = key_info
406            key = self.serializer.dumps(key)
407            
408            key_info = (key, db_id)
409            exception = None
410            result = None
411            try:
412                result = self.data_cache[key_info] = self.serializer.dumps(value)
413            except:
414                exception = get_exception()
415            
416            result_items[key_info] = (result, exception)
417        
418        self.make_live()
419        return True, result_items, None
420    
421    def _on_delete(self, key: KeyType, value: Any, db_id: DbId = None) -> ServiceProcessingResponse:
422        key = self.serializer.dumps(key)
423        
424        key_info = (key, db_id)
425        exception = None
426        result = None
427        try:
428            result = self.deletion_cache[key_info] = self.serializer.dumps(value)
429        except:
430            exception = get_exception()
431        
432        self.make_live()
433        return True, result, exception
434    
435    def _on_delete_items(self, items: Dict[Tuple[KeyType, DbId], ValueType]) -> ServiceProcessingResponse:
436        result_items: Dict[Tuple[KeyType, DbId], Tuple[ValueType, Optional[Exception]]] = dict()
437        for key_info, value in items.items():
438            key, db_id = key_info
439            key = self.serializer.dumps(key)
440            
441            key_info = (key, db_id)
442            exception = None
443            result = None
444            try:
445                result = self.deletion_cache[key_info] = self.serializer.dumps(value)
446            except:
447                exception = get_exception()
448            
449            result_items[key_info] = (result, exception)
450        
451        self.make_live()
452        return True, result_items, None
453    
454    def _open_databases(self, db_names: Dict[DbId, DbName]) -> ServiceProcessingResponse:
455        for db_id, db_name in db_names.items():
456            self.databases[db_id] = self.db_environment.open_db(db_name)
457            self.db_names[db_id] = db_name
458        
459        self.db_environment.sync(True)
460        return True, None, None
461    
462    def _drop_db(self, db_id: DbId, delete: bool = False) -> ServiceProcessingResponse:
463        coro_id = self.current_caller_coro_info.coro_id
464        self.drop_db_requests[coro_id] = (db_id, delete)
465        self.make_live()
466        return False, None, None
467    
468    def sync_in_thread_pool(self):
469        async def sync_db_coro(i: Interface, self, asyncio_loop, need_to_ensure_asyncio_loop: bool):
470            if need_to_ensure_asyncio_loop:
471                asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None,CoroPriority.low, True))
472            else:
473                if asyncio_loop is None:
474                    asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().get())
475            
476            async def sync_db(self, asyncio_loop):
477                def sync_worker():
478                    self.db_environment.sync(True)
479                
480                await task_in_thread_pool(asyncio_loop, sync_worker)
481
482            await i(AsyncioLoop, AsyncioLoopRequest().wait(sync_db(self, asyncio_loop)))
483            self.write_locked_coro_id = None
484            self.write_locked = False
485            def make_service_live_for_a_next_sync(self):
486                self.make_live()
487            
488            await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self)
489
490        asyncio_loop = None
491        need_to_ensure_asyncio_loop = False
492        try:
493            asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get()
494        except AsyncioLoopWasNotSetError:
495            need_to_ensure_asyncio_loop = True
496
497        coro: CoroWrapperBase = self._loop.put_coro(sync_db_coro, self, asyncio_loop, need_to_ensure_asyncio_loop)
498        self.write_locked = True
499        self.write_locked_coro_id = coro.coro_id
500
501
502LmdbRequest.default_service_type = Lmdb
503
504
505class DbKeyError(KeyError):
506    def __init__(self, key_info: Tuple[KeyType, DbId], *args: object) -> None:
507        super().__init__(*args)
508        self.key_info: Tuple[KeyType, DbId] = key_info
509
510
511class DBError(Exception):
512    def __init__(self, db_id: DbId, original_exception: Exception, *args):
513        super().__init__(*args)
514        self.db_id: DbId = db_id
515        self.original_exception = original_exception
516    
517    @staticmethod
518    def from_exception(db_id: DbId) -> 'DBError':
519        return DBError(db_id, get_exception())
520
521
522def lmdb_reapplier(environment: lmdb_lib.Environment, databases: Dict[Hashable, Any], handler: Callable):
523    failed = True
524    while failed:
525        need_to_resize: bool = False
526        try:
527            handler(environment, databases)
528            failed = False
529        except DBError as err:
530            if isinstance(err.original_exception, lmdb_lib.MapFullError):
531                need_to_resize = True
532        
533        if need_to_resize:
534            environment.set_mapsize(environment.info()['map_size'] + 2 * 1024**2)
class Lmdb(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin):
119class Lmdb(Service, EntityStatsMixin):
120    def __init__(self, loop: CoroSchedulerType):
121        super(Lmdb, self).__init__(loop)
122        self.default_db_name: DbName = b'__default__'
123        self.drop_db_requests: Dict[CoroID, Tuple[Hashable, bool]] = dict()
124        self.read_queue: List[Tuple[CoroID, Tuple[RawKeyType, DbId]]] = list()
125        self.massive_read_queue: List[Tuple[CoroID, Set[Tuple[KeyType, DbId]]]] = list()
126        self.data_cache: Dict[Tuple[Hashable, Hashable], Any] = dict()
127        self.deletion_cache: Dict[Tuple[Hashable, Hashable], Any] = dict()
128        self.get_all_items_queue: List[Tuple[CoroID, DbId]] = list()
129        self.path_to_db_environment = path_relative_to_current_dir('lmdb.db')
130        self.db_environment = None
131        self.databases: Dict[Hashable, Any] = dict()
132        self.db_names: Dict[DbId, DbName] = dict()
133        self.async_loop = None
134        self.sync_time_interval = 1.0
135        self.last_sync_time = perf_counter()
136        self.force_sync = False
137        self.write_locked = False
138        self.writes_num: int = 0
139        self.reads_num: int = 0
140        self.deletes_num: int = 0
141        self.db_drops_num: int = 0
142        self.write_locked_coro_id: Optional[CoroID] = None
143        # self.serializer = best_serializer_for_standard_data((DataFormats.binary,
144        #                                    Tags.can_use_bytes,
145        #                                    Tags.decode_str_as_str,
146        #                                    Tags.decode_list_as_list,
147        #                                    Tags.decode_bytes_as_bytes,
148        #                                    Tags.superficial,
149        #                                    Tags.current_platform,
150        #                                    Tags.multi_platform),
151        #                                   TestDataType.small,
152        #                                   0.1)
153        self.serializer = Serializer(Serializers.msgspec_messagepack)
154
155        self._request_workers = {
156            0: self._on_set_db_environment_path,
157            1: self._open_databases,
158            2: self._drop_db,
159            3: self._on_sync,
160            4: self._on_get,
161            5: self._on_get_items,
162            6: self._on_get_all_items,
163            7: self._on_put,
164            8: self._on_put_items,
165            9: self._on_delete,
166            10: self._on_delete_items,
167            11: self._open_db_environment,
168        }
169
170    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
171        return type(self).__name__, {
172            'db_names': list(self.db_names.keys()),
173            'writes num': self.writes_num,
174            'reads num': self.reads_num,
175            'deletes num': self.deletes_num,
176            'db drops num': self.db_drops_num,
177        }
178
179    def single_task_registration_or_immediate_processing(
180            self, *args, **kwargs) -> ServiceProcessingResponse:
181        result = self.try_resolve_request(*args, **kwargs)
182        if result is None:
183            return True, None, None
184        else:
185            return result
186
187    def full_processing_iteration(self):
188        self.force_sync = False
189        
190        if self.db_environment is None:
191            self._init_db()
192
193        data_cache_buff = self.data_cache
194        self.data_cache = type(data_cache_buff)()
195        
196        read_queue_buff = self.read_queue
197        self.read_queue = type(read_queue_buff)()
198        
199        massive_read_queue_buff = self.massive_read_queue
200        self.massive_read_queue = type(massive_read_queue_buff)()
201        
202        deletion_cache_buff = self.deletion_cache
203        self.deletion_cache = type(deletion_cache_buff)()
204        
205        get_all_items_queue_buff = self.get_all_items_queue
206        self.get_all_items_queue = type(get_all_items_queue_buff)()
207
208        # put
209        def put_handler(db_environment, databases):
210            db_id = None
211            try:
212                with db_environment.begin(write=True) as txn:
213                    for key_info, value in data_cache_buff.items():
214                        key, db_id = key_info
215                        txn.put(key, value, db=databases[db_id], dupdata=False, append=False)
216
217                self.writes_num += len(data_cache_buff)
218            except lmdb_lib.MapFullError:
219                raise DBError.from_exception(db_id)
220        
221        lmdb_reapplier(self.db_environment, self.databases, put_handler)
222        
223        # delete
224        for key_info, value in deletion_cache_buff.items():
225            with self.db_environment.begin(write=True) as txn:
226                key, db_id = key_info
227                txn.delete(key, value, db=self.databases[db_id])
228                self.deletes_num += 1
229
230        # drop
231        drop_db_requests_buff = self.drop_db_requests
232        self.drop_db_requests = type(drop_db_requests_buff)()
233        dropped_databases: Set[Hashable] = set()
234        processed_coroutines: Set[CoroID] = set()
235        
236        def drop_handler(db_environment, databases):
237            for coro_id, request in drop_db_requests_buff.items():
238                if coro_id in processed_coroutines:
239                    continue
240                
241                db_id, delete_db = request
242                if db_id not in dropped_databases:
243                    try:
244                        with db_environment.begin(write=True) as txn:
245                            txn.drop(db=databases[db_id], delete=delete_db)
246                        
247                        self.db_drops_num += 1
248                    except lmdb_lib.MapFullError:
249                        raise DBError.from_exception(db_id)
250                    
251                    dropped_databases.add(db_id)
252                
253                self.register_response(coro_id, None, None)
254                processed_coroutines.add(coro_id)
255                    
256        lmdb_reapplier(self.db_environment, self.databases, drop_handler)
257
258        # get
259        def get_item(txn, key_info, data_cache_buff) -> Tuple[ValueType, Optional[Exception]]:
260            key, db_id = key_info
261            if key_info in data_cache_buff:
262                value = data_cache_buff[key_info]
263            else:
264                value = txn.get(key, db=self.databases[db_id])
265                self.reads_num += 1
266            
267            exception = None
268            try:
269                if value is None:
270                    exception = DbKeyError(key_info)
271                else:
272                    value = self.serializer.loads(value)
273            except:
274                exception = get_exception()
275            
276            return value, exception
277            
278        with self.db_environment.begin() as txn:
279            for coro_id, key_info in read_queue_buff:
280                value, exception = get_item(txn, key_info, data_cache_buff)
281                self.register_response(coro_id, value, exception)
282            
283            for coro_id, set_of_key_info in massive_read_queue_buff:
284                items: Dict[Tuple[KeyType, DbId], Tuple[ValueType, Optional[Exception]]] = dict()
285                for key_info in set_of_key_info:
286                    items[key_info] = get_item(txn, key_info, data_cache_buff)
287                    self.register_response(coro_id, items, None)
288
289        # get all items
290        for coro_id, db_id in get_all_items_queue_buff:
291            with self.db_environment.begin(db=self.databases[db_id]) as txn:
292                result = dict()
293                exception = None
294                try:
295                    # for k, v in txn.cursor(db=self.databases[db_id]):
296                    #     key = make_key_frozen(self.serializer.loads(k))
297                    #     value = self.serializer.loads(v)
298                    #     result[key] = value
299                    result = {make_key_frozen(self.serializer.loads(k)): self.serializer.loads(v) for k, v in txn.cursor(db=self.databases[db_id])}
300                    self.reads_num += len(result)
301                except:
302                    exception = get_exception()
303                
304                self.register_response(coro_id, result, exception)
305        
306        # sync
307        self.sync_in_thread_pool()
308        
309        self.last_sync_time = perf_counter()
310
311        self.make_dead()
312
313    def in_work(self) -> bool:
314        result: bool = (bool(self.read_queue) or bool(self.massive_read_queue) or bool(self.get_all_items_queue)) or (self.force_sync or ((not bool(self.write_locked)) and (bool(self.data_cache) or bool(self.deletion_cache) or bool(self.drop_db_requests)) and ((perf_counter() - self.last_sync_time) >= self.sync_time_interval)))
315        return self.thrifty_in_work(result)
316    
317    def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]:
318        time_since_last_sync_time: float = perf_counter() - self.last_sync_time
319        if self.sync_time_interval > time_since_last_sync_time:
320            return True, self.sync_time_interval - time_since_last_sync_time
321        else:
322            return True, 0
323
324    def _init_db(self):
325        print(f'self.path_to_db_environment: {self.path_to_db_environment}')
326        self.db_environment = lmdb_lib.open(self.path_to_db_environment, map_size=20 * 1024**2, writemap=True, max_dbs=10,
327                                        map_async=True, lock=False, metasync=False, sync=False, meminit=False)
328        self.databases[None] = self.db_environment.open_db(self.default_db_name)
329        self.db_environment.sync(True)
330    
331    def _open_db_environment(self, path_to_db_environment: str) -> ServiceProcessingResponse:
332        raise NotImplementedError
333
334    def _on_set_db_environment_path(self, path_to_db_environment: str) -> ServiceProcessingResponse:
335        if self.write_locked:
336            return True, False, None
337        
338        if self.db_environment is None:
339            self.path_to_db_environment = path_to_db_environment
340            try:
341                self._init_db()
342            except:
343                exception = get_exception()
344                return True, False, exception
345            return True, True, None
346        else:
347            return True, False, None
348    
349    def _on_sync(self) -> ServiceProcessingResponse:
350        if self.data_cache:
351            self.force_sync = True
352            self.make_live()
353        else:
354            # self.db_environment.sync(True)
355            self.sync_in_thread_pool()
356        
357        return True, None, None
358    
359    def _on_get(self, key: KeyType, db_id: DbId = None) -> ServiceProcessingResponse:
360        key = self.serializer.dumps(key)
361        
362        key_info = (key, db_id)
363        if key_info in self.data_cache:
364            return True, self.data_cache[key_info], None
365        else:
366            self.read_queue.append((self.current_caller_coro_info.coro_id, key_info))
367            self.make_live()
368            return False, None, None
369    
370    def _on_get_items(self, keys: Set[Tuple[KeyType, DbId]]) -> ServiceProcessingResponse:
371        coro_id = self.current_caller_coro_info.coro_id
372        
373        raw_keys: Set[Tuple[KeyType, DbId]] = set()
374        for key, db_id in keys:
375            key = self.serializer.dumps(key)
376            
377            raw_keys.add((key, db_id))
378        
379        self.massive_read_queue.append((coro_id, raw_keys))
380        self.make_live()
381        return False, None, None
382    
383    def _on_get_all_items(self, db_id: DbId) -> ServiceProcessingResponse:
384        coro_id = self.current_caller_coro_info.coro_id
385        self.get_all_items_queue.append((coro_id, db_id))
386        self.make_live()
387        return False, None, None
388    
389    def _on_put(self, key: KeyType, value: Any, db_id: DbId = None) -> ServiceProcessingResponse:
390        key = self.serializer.dumps(key)
391        
392        key_info = (key, db_id)
393        exception = None
394        result = None
395        try:
396            result = self.data_cache[key_info] = self.serializer.dumps(value)
397        except:
398            exception = get_exception()
399        
400        self.make_live()
401        return True, result, exception
402    
403    def _on_put_items(self, items: Dict[Tuple[KeyType, DbId], ValueType]) -> ServiceProcessingResponse:
404        result_items: Dict[Tuple[KeyType, DbId], Tuple[ValueType, Optional[Exception]]] = dict()
405        for key_info, value in items.items():
406            key, db_id = key_info
407            key = self.serializer.dumps(key)
408            
409            key_info = (key, db_id)
410            exception = None
411            result = None
412            try:
413                result = self.data_cache[key_info] = self.serializer.dumps(value)
414            except:
415                exception = get_exception()
416            
417            result_items[key_info] = (result, exception)
418        
419        self.make_live()
420        return True, result_items, None
421    
422    def _on_delete(self, key: KeyType, value: Any, db_id: DbId = None) -> ServiceProcessingResponse:
423        key = self.serializer.dumps(key)
424        
425        key_info = (key, db_id)
426        exception = None
427        result = None
428        try:
429            result = self.deletion_cache[key_info] = self.serializer.dumps(value)
430        except:
431            exception = get_exception()
432        
433        self.make_live()
434        return True, result, exception
435    
436    def _on_delete_items(self, items: Dict[Tuple[KeyType, DbId], ValueType]) -> ServiceProcessingResponse:
437        result_items: Dict[Tuple[KeyType, DbId], Tuple[ValueType, Optional[Exception]]] = dict()
438        for key_info, value in items.items():
439            key, db_id = key_info
440            key = self.serializer.dumps(key)
441            
442            key_info = (key, db_id)
443            exception = None
444            result = None
445            try:
446                result = self.deletion_cache[key_info] = self.serializer.dumps(value)
447            except:
448                exception = get_exception()
449            
450            result_items[key_info] = (result, exception)
451        
452        self.make_live()
453        return True, result_items, None
454    
455    def _open_databases(self, db_names: Dict[DbId, DbName]) -> ServiceProcessingResponse:
456        for db_id, db_name in db_names.items():
457            self.databases[db_id] = self.db_environment.open_db(db_name)
458            self.db_names[db_id] = db_name
459        
460        self.db_environment.sync(True)
461        return True, None, None
462    
463    def _drop_db(self, db_id: DbId, delete: bool = False) -> ServiceProcessingResponse:
464        coro_id = self.current_caller_coro_info.coro_id
465        self.drop_db_requests[coro_id] = (db_id, delete)
466        self.make_live()
467        return False, None, None
468    
469    def sync_in_thread_pool(self):
470        async def sync_db_coro(i: Interface, self, asyncio_loop, need_to_ensure_asyncio_loop: bool):
471            if need_to_ensure_asyncio_loop:
472                asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None,CoroPriority.low, True))
473            else:
474                if asyncio_loop is None:
475                    asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().get())
476            
477            async def sync_db(self, asyncio_loop):
478                def sync_worker():
479                    self.db_environment.sync(True)
480                
481                await task_in_thread_pool(asyncio_loop, sync_worker)
482
483            await i(AsyncioLoop, AsyncioLoopRequest().wait(sync_db(self, asyncio_loop)))
484            self.write_locked_coro_id = None
485            self.write_locked = False
486            def make_service_live_for_a_next_sync(self):
487                self.make_live()
488            
489            await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self)
490
491        asyncio_loop = None
492        need_to_ensure_asyncio_loop = False
493        try:
494            asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get()
495        except AsyncioLoopWasNotSetError:
496            need_to_ensure_asyncio_loop = True
497
498        coro: CoroWrapperBase = self._loop.put_coro(sync_db_coro, self, asyncio_loop, need_to_ensure_asyncio_loop)
499        self.write_locked = True
500        self.write_locked_coro_id = coro.coro_id
Lmdb( loop: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
120    def __init__(self, loop: CoroSchedulerType):
121        super(Lmdb, self).__init__(loop)
122        self.default_db_name: DbName = b'__default__'
123        self.drop_db_requests: Dict[CoroID, Tuple[Hashable, bool]] = dict()
124        self.read_queue: List[Tuple[CoroID, Tuple[RawKeyType, DbId]]] = list()
125        self.massive_read_queue: List[Tuple[CoroID, Set[Tuple[KeyType, DbId]]]] = list()
126        self.data_cache: Dict[Tuple[Hashable, Hashable], Any] = dict()
127        self.deletion_cache: Dict[Tuple[Hashable, Hashable], Any] = dict()
128        self.get_all_items_queue: List[Tuple[CoroID, DbId]] = list()
129        self.path_to_db_environment = path_relative_to_current_dir('lmdb.db')
130        self.db_environment = None
131        self.databases: Dict[Hashable, Any] = dict()
132        self.db_names: Dict[DbId, DbName] = dict()
133        self.async_loop = None
134        self.sync_time_interval = 1.0
135        self.last_sync_time = perf_counter()
136        self.force_sync = False
137        self.write_locked = False
138        self.writes_num: int = 0
139        self.reads_num: int = 0
140        self.deletes_num: int = 0
141        self.db_drops_num: int = 0
142        self.write_locked_coro_id: Optional[CoroID] = None
143        # self.serializer = best_serializer_for_standard_data((DataFormats.binary,
144        #                                    Tags.can_use_bytes,
145        #                                    Tags.decode_str_as_str,
146        #                                    Tags.decode_list_as_list,
147        #                                    Tags.decode_bytes_as_bytes,
148        #                                    Tags.superficial,
149        #                                    Tags.current_platform,
150        #                                    Tags.multi_platform),
151        #                                   TestDataType.small,
152        #                                   0.1)
153        self.serializer = Serializer(Serializers.msgspec_messagepack)
154
155        self._request_workers = {
156            0: self._on_set_db_environment_path,
157            1: self._open_databases,
158            2: self._drop_db,
159            3: self._on_sync,
160            4: self._on_get,
161            5: self._on_get_items,
162            6: self._on_get_all_items,
163            7: self._on_put,
164            8: self._on_put_items,
165            9: self._on_delete,
166            10: self._on_delete_items,
167            11: self._open_db_environment,
168        }
default_db_name: bytes
drop_db_requests: Dict[int, Tuple[Hashable, bool]]
read_queue: List[Tuple[int, Tuple[bytes, Hashable]]]
massive_read_queue: List[Tuple[int, Set[Tuple[Union[bytes, str, Any], Hashable]]]]
data_cache: Dict[Tuple[Hashable, Hashable], Any]
deletion_cache: Dict[Tuple[Hashable, Hashable], Any]
get_all_items_queue: List[Tuple[int, Hashable]]
path_to_db_environment
db_environment
databases: Dict[Hashable, Any]
db_names: Dict[Hashable, bytes]
async_loop
sync_time_interval
last_sync_time
force_sync
write_locked
writes_num: int
reads_num: int
deletes_num: int
db_drops_num: int
write_locked_coro_id: Union[int, NoneType]
serializer
def get_entity_stats( self, stats_level: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin.StatsLevel = <StatsLevel.debug: 1>) -> Tuple[str, Dict[str, Any]]:
170    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
171        return type(self).__name__, {
172            'db_names': list(self.db_names.keys()),
173            'writes num': self.writes_num,
174            'reads num': self.reads_num,
175            'deletes num': self.deletes_num,
176            'db drops num': self.db_drops_num,
177        }
def single_task_registration_or_immediate_processing( self, *args, **kwargs) -> Tuple[bool, Any, Union[BaseException, NoneType]]:
179    def single_task_registration_or_immediate_processing(
180            self, *args, **kwargs) -> ServiceProcessingResponse:
181        result = self.try_resolve_request(*args, **kwargs)
182        if result is None:
183            return True, None, None
184        else:
185            return result
def full_processing_iteration(self):
187    def full_processing_iteration(self):
188        self.force_sync = False
189        
190        if self.db_environment is None:
191            self._init_db()
192
193        data_cache_buff = self.data_cache
194        self.data_cache = type(data_cache_buff)()
195        
196        read_queue_buff = self.read_queue
197        self.read_queue = type(read_queue_buff)()
198        
199        massive_read_queue_buff = self.massive_read_queue
200        self.massive_read_queue = type(massive_read_queue_buff)()
201        
202        deletion_cache_buff = self.deletion_cache
203        self.deletion_cache = type(deletion_cache_buff)()
204        
205        get_all_items_queue_buff = self.get_all_items_queue
206        self.get_all_items_queue = type(get_all_items_queue_buff)()
207
208        # put
209        def put_handler(db_environment, databases):
210            db_id = None
211            try:
212                with db_environment.begin(write=True) as txn:
213                    for key_info, value in data_cache_buff.items():
214                        key, db_id = key_info
215                        txn.put(key, value, db=databases[db_id], dupdata=False, append=False)
216
217                self.writes_num += len(data_cache_buff)
218            except lmdb_lib.MapFullError:
219                raise DBError.from_exception(db_id)
220        
221        lmdb_reapplier(self.db_environment, self.databases, put_handler)
222        
223        # delete
224        for key_info, value in deletion_cache_buff.items():
225            with self.db_environment.begin(write=True) as txn:
226                key, db_id = key_info
227                txn.delete(key, value, db=self.databases[db_id])
228                self.deletes_num += 1
229
230        # drop
231        drop_db_requests_buff = self.drop_db_requests
232        self.drop_db_requests = type(drop_db_requests_buff)()
233        dropped_databases: Set[Hashable] = set()
234        processed_coroutines: Set[CoroID] = set()
235        
236        def drop_handler(db_environment, databases):
237            for coro_id, request in drop_db_requests_buff.items():
238                if coro_id in processed_coroutines:
239                    continue
240                
241                db_id, delete_db = request
242                if db_id not in dropped_databases:
243                    try:
244                        with db_environment.begin(write=True) as txn:
245                            txn.drop(db=databases[db_id], delete=delete_db)
246                        
247                        self.db_drops_num += 1
248                    except lmdb_lib.MapFullError:
249                        raise DBError.from_exception(db_id)
250                    
251                    dropped_databases.add(db_id)
252                
253                self.register_response(coro_id, None, None)
254                processed_coroutines.add(coro_id)
255                    
256        lmdb_reapplier(self.db_environment, self.databases, drop_handler)
257
258        # get
259        def get_item(txn, key_info, data_cache_buff) -> Tuple[ValueType, Optional[Exception]]:
260            key, db_id = key_info
261            if key_info in data_cache_buff:
262                value = data_cache_buff[key_info]
263            else:
264                value = txn.get(key, db=self.databases[db_id])
265                self.reads_num += 1
266            
267            exception = None
268            try:
269                if value is None:
270                    exception = DbKeyError(key_info)
271                else:
272                    value = self.serializer.loads(value)
273            except:
274                exception = get_exception()
275            
276            return value, exception
277            
278        with self.db_environment.begin() as txn:
279            for coro_id, key_info in read_queue_buff:
280                value, exception = get_item(txn, key_info, data_cache_buff)
281                self.register_response(coro_id, value, exception)
282            
283            for coro_id, set_of_key_info in massive_read_queue_buff:
284                items: Dict[Tuple[KeyType, DbId], Tuple[ValueType, Optional[Exception]]] = dict()
285                for key_info in set_of_key_info:
286                    items[key_info] = get_item(txn, key_info, data_cache_buff)
287                    self.register_response(coro_id, items, None)
288
289        # get all items
290        for coro_id, db_id in get_all_items_queue_buff:
291            with self.db_environment.begin(db=self.databases[db_id]) as txn:
292                result = dict()
293                exception = None
294                try:
295                    # for k, v in txn.cursor(db=self.databases[db_id]):
296                    #     key = make_key_frozen(self.serializer.loads(k))
297                    #     value = self.serializer.loads(v)
298                    #     result[key] = value
299                    result = {make_key_frozen(self.serializer.loads(k)): self.serializer.loads(v) for k, v in txn.cursor(db=self.databases[db_id])}
300                    self.reads_num += len(result)
301                except:
302                    exception = get_exception()
303                
304                self.register_response(coro_id, result, exception)
305        
306        # sync
307        self.sync_in_thread_pool()
308        
309        self.last_sync_time = perf_counter()
310
311        self.make_dead()
def in_work(self) -> bool:
313    def in_work(self) -> bool:
314        result: bool = (bool(self.read_queue) or bool(self.massive_read_queue) or bool(self.get_all_items_queue)) or (self.force_sync or ((not bool(self.write_locked)) and (bool(self.data_cache) or bool(self.deletion_cache) or bool(self.drop_db_requests)) and ((perf_counter() - self.last_sync_time) >= self.sync_time_interval)))
315        return self.thrifty_in_work(result)

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

def time_left_before_next_event(self) -> Tuple[bool, Union[int, float, NoneType]]:
317    def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]:
318        time_since_last_sync_time: float = perf_counter() - self.last_sync_time
319        if self.sync_time_interval > time_since_last_sync_time:
320            return True, self.sync_time_interval - time_since_last_sync_time
321        else:
322            return True, 0
def sync_in_thread_pool(self):
469    def sync_in_thread_pool(self):
470        async def sync_db_coro(i: Interface, self, asyncio_loop, need_to_ensure_asyncio_loop: bool):
471            if need_to_ensure_asyncio_loop:
472                asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None,CoroPriority.low, True))
473            else:
474                if asyncio_loop is None:
475                    asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().get())
476            
477            async def sync_db(self, asyncio_loop):
478                def sync_worker():
479                    self.db_environment.sync(True)
480                
481                await task_in_thread_pool(asyncio_loop, sync_worker)
482
483            await i(AsyncioLoop, AsyncioLoopRequest().wait(sync_db(self, asyncio_loop)))
484            self.write_locked_coro_id = None
485            self.write_locked = False
486            def make_service_live_for_a_next_sync(self):
487                self.make_live()
488            
489            await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self)
490
491        asyncio_loop = None
492        need_to_ensure_asyncio_loop = False
493        try:
494            asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get()
495        except AsyncioLoopWasNotSetError:
496            need_to_ensure_asyncio_loop = True
497
498        coro: CoroWrapperBase = self._loop.put_coro(sync_db_coro, self, asyncio_loop, need_to_ensure_asyncio_loop)
499        self.write_locked = True
500        self.write_locked_coro_id = coro.coro_id
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
in_forground_work
thrifty_in_work
is_low_latency
make_live
make_dead
service_id_impl
service_id
destroy
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
StatsLevel
class LmdbRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
 70class LmdbRequest(ServiceRequest):
 71    def set_db_environment_path(self, path_to_db_environment: str) -> ServiceRequest:
 72        return self._save(0, path_to_db_environment)
 73    
 74    def open_databases(self, db_names: Dict[DbId, DbName]) -> ServiceRequest:
 75        return self._save(1, db_names)
 76    
 77    def drop_db(self, db_id: DbId, delete: bool = False) -> ServiceRequest:
 78        return self._save(2, db_id, delete)
 79    
 80    def sync(self) -> ServiceRequest:
 81        return self._save(3)
 82    
 83    def get(self, key: KeyType, db_id: DbId = None) -> ServiceRequest:
 84        return self._save(4, key, db_id)
 85    
 86    def get_items(self, keys: Set[Tuple[KeyType, DbId]]) -> ServiceRequest:
 87        return self._save(5, keys)
 88    
 89    def get_all_items(self, db_id: DbId = None) -> ServiceRequest:
 90        return self._save(6, db_id)
 91    
 92    def put(self, key: KeyType, value: Optional[ValueType] = None, db_id: DbId = None) -> ServiceRequest:
 93        return self._save(7, key, value, db_id)
 94    
 95    def put_items(self, items: Dict[Tuple[KeyType, DbId], Optional[ValueType]]) -> ServiceRequest:
 96        return self._save(8, items)
 97    
 98    def delete(self, key: KeyType, value: Optional[ValueType] = None, db_id: DbId = None) -> ServiceRequest:
 99        return self._save(9, key, value, db_id)
100    
101    def delete_items(self, items: Dict[Tuple[KeyType, DbId], Optional[ValueType]]) -> ServiceRequest:
102        return self._save(10, items)
103
104    def open_db_environment(self, path_to_db_environment: str) -> ServiceRequest:
105        return self._save(11, path_to_db_environment)
def set_db_environment_path( self, path_to_db_environment: str) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
71    def set_db_environment_path(self, path_to_db_environment: str) -> ServiceRequest:
72        return self._save(0, path_to_db_environment)
def open_databases( self, db_names: typing.Dict[typing.Hashable, bytes]) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
74    def open_databases(self, db_names: Dict[DbId, DbName]) -> ServiceRequest:
75        return self._save(1, db_names)
def drop_db( self, db_id: typing.Hashable, delete: bool = False) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
77    def drop_db(self, db_id: DbId, delete: bool = False) -> ServiceRequest:
78        return self._save(2, db_id, delete)
def sync( self) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
80    def sync(self) -> ServiceRequest:
81        return self._save(3)
def get( self, key: typing.Union[bytes, str, typing.Any], db_id: typing.Hashable = None) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
83    def get(self, key: KeyType, db_id: DbId = None) -> ServiceRequest:
84        return self._save(4, key, db_id)
def get_items( self, keys: typing.Set[typing.Tuple[typing.Union[bytes, str, typing.Any], typing.Hashable]]) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
86    def get_items(self, keys: Set[Tuple[KeyType, DbId]]) -> ServiceRequest:
87        return self._save(5, keys)
def get_all_items( self, db_id: typing.Hashable = None) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
89    def get_all_items(self, db_id: DbId = None) -> ServiceRequest:
90        return self._save(6, db_id)
def put( self, key: typing.Union[bytes, str, typing.Any], value: typing.Union[typing.Any, NoneType] = None, db_id: typing.Hashable = None) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
92    def put(self, key: KeyType, value: Optional[ValueType] = None, db_id: DbId = None) -> ServiceRequest:
93        return self._save(7, key, value, db_id)
def put_items( self, items: typing.Dict[typing.Tuple[typing.Union[bytes, str, typing.Any], typing.Hashable], typing.Union[typing.Any, NoneType]]) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
95    def put_items(self, items: Dict[Tuple[KeyType, DbId], Optional[ValueType]]) -> ServiceRequest:
96        return self._save(8, items)
def delete( self, key: typing.Union[bytes, str, typing.Any], value: typing.Union[typing.Any, NoneType] = None, db_id: typing.Hashable = None) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
98    def delete(self, key: KeyType, value: Optional[ValueType] = None, db_id: DbId = None) -> ServiceRequest:
99        return self._save(9, key, value, db_id)
def delete_items( self, items: typing.Dict[typing.Tuple[typing.Union[bytes, str, typing.Any], typing.Hashable], typing.Union[typing.Any, NoneType]]) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
101    def delete_items(self, items: Dict[Tuple[KeyType, DbId], Optional[ValueType]]) -> ServiceRequest:
102        return self._save(10, items)
def open_db_environment( self, path_to_db_environment: str) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
104    def open_db_environment(self, path_to_db_environment: str) -> ServiceRequest:
105        return self._save(11, path_to_db_environment)
default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] = <class 'Lmdb'>
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
default__request__type__
request_type
args
kwargs
provide_to_request_handler
interface
i
async_interface
ai
KeyType = typing.Union[bytes, str, typing.Any]
RawKeyType = <class 'bytes'>
ValueType = typing.Any
RawValueType = <class 'bytes'>
DbId = typing.Hashable
DbName = <class 'bytes'>
class DbKeyError(builtins.KeyError):
506class DbKeyError(KeyError):
507    def __init__(self, key_info: Tuple[KeyType, DbId], *args: object) -> None:
508        super().__init__(*args)
509        self.key_info: Tuple[KeyType, DbId] = key_info

Mapping key not found.

DbKeyError( key_info: typing.Tuple[typing.Union[bytes, str, typing.Any], typing.Hashable], *args: object)
507    def __init__(self, key_info: Tuple[KeyType, DbId], *args: object) -> None:
508        super().__init__(*args)
509        self.key_info: Tuple[KeyType, DbId] = key_info
key_info: Tuple[Union[bytes, str, Any], Hashable]
Inherited Members
builtins.BaseException
with_traceback
args