cengal.parallel_execution.coroutines.coro_standard_services.lmdb.versions.v_0.lmdb
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19__all__ = ['Lmdb', 'LmdbRequest', 'KeyType', 'RawKeyType', 'ValueType', 'RawValueType', 'DbId', 'DbName', 'DbKeyError'] 20 21from cengal.parallel_execution.coroutines.coro_scheduler import * 22from cengal.parallel_execution.coroutines.coro_tools.await_coro import * 23from cengal.parallel_execution.coroutines.coro_standard_services.asyncio_loop import * 24from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import CoroPriority 25from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import * 26from cengal.parallel_execution.coroutines.coro_standard_services.timer_func_runner import * 27from cengal.file_system.file_manager import path_relative_to_current_dir 28from cengal.time_management.cpu_clock_cycles import perf_counter 29from cengal.data_manipulation.serialization import * 30from cengal.introspection.inspect import get_exception 31from typing import Hashable, Tuple, List, Any, Dict, Callable 32import sys 33import os 34import asyncio 35try: 36 import lmdb as lmdb_lib 37except ImportError: 38 from warnings import warn 39 warn('''WARNING: `lmdb` library is not installed. Lmdb service will not work. 40 To install `lmdb` use: `pip install lmdb`''') 41 raise 42 43 44""" 45Module Docstring 46Docstrings: http://www.python.org/dev/peps/pep-0257/ 47""" 48 49__author__ = "ButenkoMS <gtalk@butenkoms.space>" 50__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 51__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 52__license__ = "Apache License, Version 2.0" 53__version__ = "4.4.1" 54__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 55__email__ = "gtalk@butenkoms.space" 56# __status__ = "Prototype" 57__status__ = "Development" 58# __status__ = "Production" 59 60 61KeyType = Union[bytes, str, Any] 62RawKeyType = bytes 63ValueType = Any 64RawValueType = bytes 65DbId = Hashable 66DbName = bytes 67 68 69class LmdbRequest(ServiceRequest): 70 def set_db_environment_path(self, path_to_db_environment: str) -> ServiceRequest: 71 return self._save(0, path_to_db_environment) 72 73 def open_databases(self, db_names: Dict[DbId, DbName]) -> ServiceRequest: 74 return self._save(1, db_names) 75 76 def drop_db(self, db_id: DbId, delete: bool = False) -> ServiceRequest: 77 return self._save(2, db_id, delete) 78 79 def sync(self) -> ServiceRequest: 80 return self._save(3) 81 82 def get(self, key: KeyType, db_id: DbId = None) -> ServiceRequest: 83 return self._save(4, key, db_id) 84 85 def get_items(self, keys: Set[Tuple[KeyType, DbId]]) -> ServiceRequest: 86 return self._save(5, keys) 87 88 def get_all_items(self, db_id: DbId = None) -> ServiceRequest: 89 return self._save(6, db_id) 90 91 def put(self, key: KeyType, value: Optional[ValueType] = None, db_id: DbId = None) -> ServiceRequest: 92 return self._save(7, key, value, db_id) 93 94 def put_items(self, items: Dict[Tuple[KeyType, DbId], Optional[ValueType]]) -> ServiceRequest: 95 return self._save(8, items) 96 97 def delete(self, key: KeyType, value: Optional[ValueType] = None, db_id: DbId = None) -> ServiceRequest: 98 return self._save(9, key, value, db_id) 99 100 def delete_items(self, items: Dict[Tuple[KeyType, DbId], Optional[ValueType]]) -> ServiceRequest: 101 return self._save(10, items) 102 103 def open_db_environment(self, path_to_db_environment: str) -> ServiceRequest: 104 return self._save(11, path_to_db_environment) 105 106 107def make_key_frozen(key): 108 if isinstance(key, list) or isinstance(key, set): 109 new_key = list() 110 for item in key: 111 new_key.append(make_key_frozen(item)) 112 113 key = tuple(new_key) 114 115 return key 116 117 118class Lmdb(Service, EntityStatsMixin): 119 def __init__(self, loop: CoroSchedulerType): 120 super(Lmdb, self).__init__(loop) 121 self.default_db_name: DbName = b'__default__' 122 self.drop_db_requests: Dict[CoroID, Tuple[Hashable, bool]] = dict() 123 self.read_queue: List[Tuple[CoroID, Tuple[RawKeyType, DbId]]] = list() 124 self.massive_read_queue: List[Tuple[CoroID, Set[Tuple[KeyType, DbId]]]] = list() 125 self.data_cache: Dict[Tuple[Hashable, Hashable], Any] = dict() 126 self.deletion_cache: Dict[Tuple[Hashable, Hashable], Any] = dict() 127 self.get_all_items_queue: List[Tuple[CoroID, DbId]] = list() 128 self.path_to_db_environment = path_relative_to_current_dir('lmdb.db') 129 self.db_environment = None 130 self.databases: Dict[Hashable, Any] = dict() 131 self.db_names: Dict[DbId, DbName] = dict() 132 self.async_loop = None 133 self.sync_time_interval = 1.0 134 self.last_sync_time = perf_counter() 135 self.force_sync = False 136 self.write_locked = False 137 self.writes_num: int = 0 138 self.reads_num: int = 0 139 self.deletes_num: int = 0 140 self.db_drops_num: int = 0 141 self.write_locked_coro_id: Optional[CoroID] = None 142 # self.serializer = best_serializer_for_standard_data((DataFormats.binary, 143 # Tags.can_use_bytes, 144 # Tags.decode_str_as_str, 145 # Tags.decode_list_as_list, 146 # Tags.decode_bytes_as_bytes, 147 # Tags.superficial, 148 # Tags.current_platform, 149 # Tags.multi_platform), 150 # TestDataType.small, 151 # 0.1) 152 self.serializer = Serializer(Serializers.msgspec_messagepack) 153 154 self._request_workers = { 155 0: self._on_set_db_environment_path, 156 1: self._open_databases, 157 2: self._drop_db, 158 3: self._on_sync, 159 4: self._on_get, 160 5: self._on_get_items, 161 6: self._on_get_all_items, 162 7: self._on_put, 163 8: self._on_put_items, 164 9: self._on_delete, 165 10: self._on_delete_items, 166 11: self._open_db_environment, 167 } 168 169 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 170 return type(self).__name__, { 171 'db_names': list(self.db_names.keys()), 172 'writes num': self.writes_num, 173 'reads num': self.reads_num, 174 'deletes num': self.deletes_num, 175 'db drops num': self.db_drops_num, 176 } 177 178 def single_task_registration_or_immediate_processing( 179 self, *args, **kwargs) -> ServiceProcessingResponse: 180 result = self.try_resolve_request(*args, **kwargs) 181 if result is None: 182 return True, None, None 183 else: 184 return result 185 186 def full_processing_iteration(self): 187 self.force_sync = False 188 189 if self.db_environment is None: 190 self._init_db() 191 192 data_cache_buff = self.data_cache 193 self.data_cache = type(data_cache_buff)() 194 195 read_queue_buff = self.read_queue 196 self.read_queue = type(read_queue_buff)() 197 198 massive_read_queue_buff = self.massive_read_queue 199 self.massive_read_queue = type(massive_read_queue_buff)() 200 201 deletion_cache_buff = self.deletion_cache 202 self.deletion_cache = type(deletion_cache_buff)() 203 204 get_all_items_queue_buff = self.get_all_items_queue 205 self.get_all_items_queue = type(get_all_items_queue_buff)() 206 207 # put 208 def put_handler(db_environment, databases): 209 db_id = None 210 try: 211 with db_environment.begin(write=True) as txn: 212 for key_info, value in data_cache_buff.items(): 213 key, db_id = key_info 214 txn.put(key, value, db=databases[db_id], dupdata=False, append=False) 215 216 self.writes_num += len(data_cache_buff) 217 except lmdb_lib.MapFullError: 218 raise DBError.from_exception(db_id) 219 220 lmdb_reapplier(self.db_environment, self.databases, put_handler) 221 222 # delete 223 for key_info, value in deletion_cache_buff.items(): 224 with self.db_environment.begin(write=True) as txn: 225 key, db_id = key_info 226 txn.delete(key, value, db=self.databases[db_id]) 227 self.deletes_num += 1 228 229 # drop 230 drop_db_requests_buff = self.drop_db_requests 231 self.drop_db_requests = type(drop_db_requests_buff)() 232 dropped_databases: Set[Hashable] = set() 233 processed_coroutines: Set[CoroID] = set() 234 235 def drop_handler(db_environment, databases): 236 for coro_id, request in drop_db_requests_buff.items(): 237 if coro_id in processed_coroutines: 238 continue 239 240 db_id, delete_db = request 241 if db_id not in dropped_databases: 242 try: 243 with db_environment.begin(write=True) as txn: 244 txn.drop(db=databases[db_id], delete=delete_db) 245 246 self.db_drops_num += 1 247 except lmdb_lib.MapFullError: 248 raise DBError.from_exception(db_id) 249 250 dropped_databases.add(db_id) 251 252 self.register_response(coro_id, None, None) 253 processed_coroutines.add(coro_id) 254 255 lmdb_reapplier(self.db_environment, self.databases, drop_handler) 256 257 # get 258 def get_item(txn, key_info, data_cache_buff) -> Tuple[ValueType, Optional[Exception]]: 259 key, db_id = key_info 260 if key_info in data_cache_buff: 261 value = data_cache_buff[key_info] 262 else: 263 value = txn.get(key, db=self.databases[db_id]) 264 self.reads_num += 1 265 266 exception = None 267 try: 268 if value is None: 269 exception = DbKeyError(key_info) 270 else: 271 value = self.serializer.loads(value) 272 except: 273 exception = get_exception() 274 275 return value, exception 276 277 with self.db_environment.begin() as txn: 278 for coro_id, key_info in read_queue_buff: 279 value, exception = get_item(txn, key_info, data_cache_buff) 280 self.register_response(coro_id, value, exception) 281 282 for coro_id, set_of_key_info in massive_read_queue_buff: 283 items: Dict[Tuple[KeyType, DbId], Tuple[ValueType, Optional[Exception]]] = dict() 284 for key_info in set_of_key_info: 285 items[key_info] = get_item(txn, key_info, data_cache_buff) 286 self.register_response(coro_id, items, None) 287 288 # get all items 289 for coro_id, db_id in get_all_items_queue_buff: 290 with self.db_environment.begin(db=self.databases[db_id]) as txn: 291 result = dict() 292 exception = None 293 try: 294 # for k, v in txn.cursor(db=self.databases[db_id]): 295 # key = make_key_frozen(self.serializer.loads(k)) 296 # value = self.serializer.loads(v) 297 # result[key] = value 298 result = {make_key_frozen(self.serializer.loads(k)): self.serializer.loads(v) for k, v in txn.cursor(db=self.databases[db_id])} 299 self.reads_num += len(result) 300 except: 301 exception = get_exception() 302 303 self.register_response(coro_id, result, exception) 304 305 # sync 306 self.sync_in_thread_pool() 307 308 self.last_sync_time = perf_counter() 309 310 self.make_dead() 311 312 def in_work(self) -> bool: 313 result: bool = (bool(self.read_queue) or bool(self.massive_read_queue) or bool(self.get_all_items_queue)) or (self.force_sync or ((not bool(self.write_locked)) and (bool(self.data_cache) or bool(self.deletion_cache) or bool(self.drop_db_requests)) and ((perf_counter() - self.last_sync_time) >= self.sync_time_interval))) 314 return self.thrifty_in_work(result) 315 316 def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]: 317 time_since_last_sync_time: float = perf_counter() - self.last_sync_time 318 if self.sync_time_interval > time_since_last_sync_time: 319 return True, self.sync_time_interval - time_since_last_sync_time 320 else: 321 return True, 0 322 323 def _init_db(self): 324 print(f'self.path_to_db_environment: {self.path_to_db_environment}') 325 self.db_environment = lmdb_lib.open(self.path_to_db_environment, map_size=20 * 1024**2, writemap=True, max_dbs=10, 326 map_async=True, lock=False, metasync=False, sync=False, meminit=False) 327 self.databases[None] = self.db_environment.open_db(self.default_db_name) 328 self.db_environment.sync(True) 329 330 def _open_db_environment(self, path_to_db_environment: str) -> ServiceProcessingResponse: 331 raise NotImplementedError 332 333 def _on_set_db_environment_path(self, path_to_db_environment: str) -> ServiceProcessingResponse: 334 if self.write_locked: 335 return True, False, None 336 337 if self.db_environment is None: 338 self.path_to_db_environment = path_to_db_environment 339 try: 340 self._init_db() 341 except: 342 exception = get_exception() 343 return True, False, exception 344 return True, True, None 345 else: 346 return True, False, None 347 348 def _on_sync(self) -> ServiceProcessingResponse: 349 if self.data_cache: 350 self.force_sync = True 351 self.make_live() 352 else: 353 # self.db_environment.sync(True) 354 self.sync_in_thread_pool() 355 356 return True, None, None 357 358 def _on_get(self, key: KeyType, db_id: DbId = None) -> ServiceProcessingResponse: 359 key = self.serializer.dumps(key) 360 361 key_info = (key, db_id) 362 if key_info in self.data_cache: 363 return True, self.data_cache[key_info], None 364 else: 365 self.read_queue.append((self.current_caller_coro_info.coro_id, key_info)) 366 self.make_live() 367 return False, None, None 368 369 def _on_get_items(self, keys: Set[Tuple[KeyType, DbId]]) -> ServiceProcessingResponse: 370 coro_id = self.current_caller_coro_info.coro_id 371 372 raw_keys: Set[Tuple[KeyType, DbId]] = set() 373 for key, db_id in keys: 374 key = self.serializer.dumps(key) 375 376 raw_keys.add((key, db_id)) 377 378 self.massive_read_queue.append((coro_id, raw_keys)) 379 self.make_live() 380 return False, None, None 381 382 def _on_get_all_items(self, db_id: DbId) -> ServiceProcessingResponse: 383 coro_id = self.current_caller_coro_info.coro_id 384 self.get_all_items_queue.append((coro_id, db_id)) 385 self.make_live() 386 return False, None, None 387 388 def _on_put(self, key: KeyType, value: Any, db_id: DbId = None) -> ServiceProcessingResponse: 389 key = self.serializer.dumps(key) 390 391 key_info = (key, db_id) 392 exception = None 393 result = None 394 try: 395 result = self.data_cache[key_info] = self.serializer.dumps(value) 396 except: 397 exception = get_exception() 398 399 self.make_live() 400 return True, result, exception 401 402 def _on_put_items(self, items: Dict[Tuple[KeyType, DbId], ValueType]) -> ServiceProcessingResponse: 403 result_items: Dict[Tuple[KeyType, DbId], Tuple[ValueType, Optional[Exception]]] = dict() 404 for key_info, value in items.items(): 405 key, db_id = key_info 406 key = self.serializer.dumps(key) 407 408 key_info = (key, db_id) 409 exception = None 410 result = None 411 try: 412 result = self.data_cache[key_info] = self.serializer.dumps(value) 413 except: 414 exception = get_exception() 415 416 result_items[key_info] = (result, exception) 417 418 self.make_live() 419 return True, result_items, None 420 421 def _on_delete(self, key: KeyType, value: Any, db_id: DbId = None) -> ServiceProcessingResponse: 422 key = self.serializer.dumps(key) 423 424 key_info = (key, db_id) 425 exception = None 426 result = None 427 try: 428 result = self.deletion_cache[key_info] = self.serializer.dumps(value) 429 except: 430 exception = get_exception() 431 432 self.make_live() 433 return True, result, exception 434 435 def _on_delete_items(self, items: Dict[Tuple[KeyType, DbId], ValueType]) -> ServiceProcessingResponse: 436 result_items: Dict[Tuple[KeyType, DbId], Tuple[ValueType, Optional[Exception]]] = dict() 437 for key_info, value in items.items(): 438 key, db_id = key_info 439 key = self.serializer.dumps(key) 440 441 key_info = (key, db_id) 442 exception = None 443 result = None 444 try: 445 result = self.deletion_cache[key_info] = self.serializer.dumps(value) 446 except: 447 exception = get_exception() 448 449 result_items[key_info] = (result, exception) 450 451 self.make_live() 452 return True, result_items, None 453 454 def _open_databases(self, db_names: Dict[DbId, DbName]) -> ServiceProcessingResponse: 455 for db_id, db_name in db_names.items(): 456 self.databases[db_id] = self.db_environment.open_db(db_name) 457 self.db_names[db_id] = db_name 458 459 self.db_environment.sync(True) 460 return True, None, None 461 462 def _drop_db(self, db_id: DbId, delete: bool = False) -> ServiceProcessingResponse: 463 coro_id = self.current_caller_coro_info.coro_id 464 self.drop_db_requests[coro_id] = (db_id, delete) 465 self.make_live() 466 return False, None, None 467 468 def sync_in_thread_pool(self): 469 async def sync_db_coro(i: Interface, self, asyncio_loop, need_to_ensure_asyncio_loop: bool): 470 if need_to_ensure_asyncio_loop: 471 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None,CoroPriority.low, True)) 472 else: 473 if asyncio_loop is None: 474 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().get()) 475 476 async def sync_db(self, asyncio_loop): 477 def sync_worker(): 478 self.db_environment.sync(True) 479 480 await task_in_thread_pool(asyncio_loop, sync_worker) 481 482 await i(AsyncioLoop, AsyncioLoopRequest().wait(sync_db(self, asyncio_loop))) 483 self.write_locked_coro_id = None 484 self.write_locked = False 485 def make_service_live_for_a_next_sync(self): 486 self.make_live() 487 488 await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self) 489 490 asyncio_loop = None 491 need_to_ensure_asyncio_loop = False 492 try: 493 asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get() 494 except AsyncioLoopWasNotSetError: 495 need_to_ensure_asyncio_loop = True 496 497 coro: CoroWrapperBase = self._loop.put_coro(sync_db_coro, self, asyncio_loop, need_to_ensure_asyncio_loop) 498 self.write_locked = True 499 self.write_locked_coro_id = coro.coro_id 500 501 502LmdbRequest.default_service_type = Lmdb 503 504 505class DbKeyError(KeyError): 506 def __init__(self, key_info: Tuple[KeyType, DbId], *args: object) -> None: 507 super().__init__(*args) 508 self.key_info: Tuple[KeyType, DbId] = key_info 509 510 511class DBError(Exception): 512 def __init__(self, db_id: DbId, original_exception: Exception, *args): 513 super().__init__(*args) 514 self.db_id: DbId = db_id 515 self.original_exception = original_exception 516 517 @staticmethod 518 def from_exception(db_id: DbId) -> 'DBError': 519 return DBError(db_id, get_exception()) 520 521 522def lmdb_reapplier(environment: lmdb_lib.Environment, databases: Dict[Hashable, Any], handler: Callable): 523 failed = True 524 while failed: 525 need_to_resize: bool = False 526 try: 527 handler(environment, databases) 528 failed = False 529 except DBError as err: 530 if isinstance(err.original_exception, lmdb_lib.MapFullError): 531 need_to_resize = True 532 533 if need_to_resize: 534 environment.set_mapsize(environment.info()['map_size'] + 2 * 1024**2)
class
Lmdb(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin):
119class Lmdb(Service, EntityStatsMixin): 120 def __init__(self, loop: CoroSchedulerType): 121 super(Lmdb, self).__init__(loop) 122 self.default_db_name: DbName = b'__default__' 123 self.drop_db_requests: Dict[CoroID, Tuple[Hashable, bool]] = dict() 124 self.read_queue: List[Tuple[CoroID, Tuple[RawKeyType, DbId]]] = list() 125 self.massive_read_queue: List[Tuple[CoroID, Set[Tuple[KeyType, DbId]]]] = list() 126 self.data_cache: Dict[Tuple[Hashable, Hashable], Any] = dict() 127 self.deletion_cache: Dict[Tuple[Hashable, Hashable], Any] = dict() 128 self.get_all_items_queue: List[Tuple[CoroID, DbId]] = list() 129 self.path_to_db_environment = path_relative_to_current_dir('lmdb.db') 130 self.db_environment = None 131 self.databases: Dict[Hashable, Any] = dict() 132 self.db_names: Dict[DbId, DbName] = dict() 133 self.async_loop = None 134 self.sync_time_interval = 1.0 135 self.last_sync_time = perf_counter() 136 self.force_sync = False 137 self.write_locked = False 138 self.writes_num: int = 0 139 self.reads_num: int = 0 140 self.deletes_num: int = 0 141 self.db_drops_num: int = 0 142 self.write_locked_coro_id: Optional[CoroID] = None 143 # self.serializer = best_serializer_for_standard_data((DataFormats.binary, 144 # Tags.can_use_bytes, 145 # Tags.decode_str_as_str, 146 # Tags.decode_list_as_list, 147 # Tags.decode_bytes_as_bytes, 148 # Tags.superficial, 149 # Tags.current_platform, 150 # Tags.multi_platform), 151 # TestDataType.small, 152 # 0.1) 153 self.serializer = Serializer(Serializers.msgspec_messagepack) 154 155 self._request_workers = { 156 0: self._on_set_db_environment_path, 157 1: self._open_databases, 158 2: self._drop_db, 159 3: self._on_sync, 160 4: self._on_get, 161 5: self._on_get_items, 162 6: self._on_get_all_items, 163 7: self._on_put, 164 8: self._on_put_items, 165 9: self._on_delete, 166 10: self._on_delete_items, 167 11: self._open_db_environment, 168 } 169 170 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 171 return type(self).__name__, { 172 'db_names': list(self.db_names.keys()), 173 'writes num': self.writes_num, 174 'reads num': self.reads_num, 175 'deletes num': self.deletes_num, 176 'db drops num': self.db_drops_num, 177 } 178 179 def single_task_registration_or_immediate_processing( 180 self, *args, **kwargs) -> ServiceProcessingResponse: 181 result = self.try_resolve_request(*args, **kwargs) 182 if result is None: 183 return True, None, None 184 else: 185 return result 186 187 def full_processing_iteration(self): 188 self.force_sync = False 189 190 if self.db_environment is None: 191 self._init_db() 192 193 data_cache_buff = self.data_cache 194 self.data_cache = type(data_cache_buff)() 195 196 read_queue_buff = self.read_queue 197 self.read_queue = type(read_queue_buff)() 198 199 massive_read_queue_buff = self.massive_read_queue 200 self.massive_read_queue = type(massive_read_queue_buff)() 201 202 deletion_cache_buff = self.deletion_cache 203 self.deletion_cache = type(deletion_cache_buff)() 204 205 get_all_items_queue_buff = self.get_all_items_queue 206 self.get_all_items_queue = type(get_all_items_queue_buff)() 207 208 # put 209 def put_handler(db_environment, databases): 210 db_id = None 211 try: 212 with db_environment.begin(write=True) as txn: 213 for key_info, value in data_cache_buff.items(): 214 key, db_id = key_info 215 txn.put(key, value, db=databases[db_id], dupdata=False, append=False) 216 217 self.writes_num += len(data_cache_buff) 218 except lmdb_lib.MapFullError: 219 raise DBError.from_exception(db_id) 220 221 lmdb_reapplier(self.db_environment, self.databases, put_handler) 222 223 # delete 224 for key_info, value in deletion_cache_buff.items(): 225 with self.db_environment.begin(write=True) as txn: 226 key, db_id = key_info 227 txn.delete(key, value, db=self.databases[db_id]) 228 self.deletes_num += 1 229 230 # drop 231 drop_db_requests_buff = self.drop_db_requests 232 self.drop_db_requests = type(drop_db_requests_buff)() 233 dropped_databases: Set[Hashable] = set() 234 processed_coroutines: Set[CoroID] = set() 235 236 def drop_handler(db_environment, databases): 237 for coro_id, request in drop_db_requests_buff.items(): 238 if coro_id in processed_coroutines: 239 continue 240 241 db_id, delete_db = request 242 if db_id not in dropped_databases: 243 try: 244 with db_environment.begin(write=True) as txn: 245 txn.drop(db=databases[db_id], delete=delete_db) 246 247 self.db_drops_num += 1 248 except lmdb_lib.MapFullError: 249 raise DBError.from_exception(db_id) 250 251 dropped_databases.add(db_id) 252 253 self.register_response(coro_id, None, None) 254 processed_coroutines.add(coro_id) 255 256 lmdb_reapplier(self.db_environment, self.databases, drop_handler) 257 258 # get 259 def get_item(txn, key_info, data_cache_buff) -> Tuple[ValueType, Optional[Exception]]: 260 key, db_id = key_info 261 if key_info in data_cache_buff: 262 value = data_cache_buff[key_info] 263 else: 264 value = txn.get(key, db=self.databases[db_id]) 265 self.reads_num += 1 266 267 exception = None 268 try: 269 if value is None: 270 exception = DbKeyError(key_info) 271 else: 272 value = self.serializer.loads(value) 273 except: 274 exception = get_exception() 275 276 return value, exception 277 278 with self.db_environment.begin() as txn: 279 for coro_id, key_info in read_queue_buff: 280 value, exception = get_item(txn, key_info, data_cache_buff) 281 self.register_response(coro_id, value, exception) 282 283 for coro_id, set_of_key_info in massive_read_queue_buff: 284 items: Dict[Tuple[KeyType, DbId], Tuple[ValueType, Optional[Exception]]] = dict() 285 for key_info in set_of_key_info: 286 items[key_info] = get_item(txn, key_info, data_cache_buff) 287 self.register_response(coro_id, items, None) 288 289 # get all items 290 for coro_id, db_id in get_all_items_queue_buff: 291 with self.db_environment.begin(db=self.databases[db_id]) as txn: 292 result = dict() 293 exception = None 294 try: 295 # for k, v in txn.cursor(db=self.databases[db_id]): 296 # key = make_key_frozen(self.serializer.loads(k)) 297 # value = self.serializer.loads(v) 298 # result[key] = value 299 result = {make_key_frozen(self.serializer.loads(k)): self.serializer.loads(v) for k, v in txn.cursor(db=self.databases[db_id])} 300 self.reads_num += len(result) 301 except: 302 exception = get_exception() 303 304 self.register_response(coro_id, result, exception) 305 306 # sync 307 self.sync_in_thread_pool() 308 309 self.last_sync_time = perf_counter() 310 311 self.make_dead() 312 313 def in_work(self) -> bool: 314 result: bool = (bool(self.read_queue) or bool(self.massive_read_queue) or bool(self.get_all_items_queue)) or (self.force_sync or ((not bool(self.write_locked)) and (bool(self.data_cache) or bool(self.deletion_cache) or bool(self.drop_db_requests)) and ((perf_counter() - self.last_sync_time) >= self.sync_time_interval))) 315 return self.thrifty_in_work(result) 316 317 def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]: 318 time_since_last_sync_time: float = perf_counter() - self.last_sync_time 319 if self.sync_time_interval > time_since_last_sync_time: 320 return True, self.sync_time_interval - time_since_last_sync_time 321 else: 322 return True, 0 323 324 def _init_db(self): 325 print(f'self.path_to_db_environment: {self.path_to_db_environment}') 326 self.db_environment = lmdb_lib.open(self.path_to_db_environment, map_size=20 * 1024**2, writemap=True, max_dbs=10, 327 map_async=True, lock=False, metasync=False, sync=False, meminit=False) 328 self.databases[None] = self.db_environment.open_db(self.default_db_name) 329 self.db_environment.sync(True) 330 331 def _open_db_environment(self, path_to_db_environment: str) -> ServiceProcessingResponse: 332 raise NotImplementedError 333 334 def _on_set_db_environment_path(self, path_to_db_environment: str) -> ServiceProcessingResponse: 335 if self.write_locked: 336 return True, False, None 337 338 if self.db_environment is None: 339 self.path_to_db_environment = path_to_db_environment 340 try: 341 self._init_db() 342 except: 343 exception = get_exception() 344 return True, False, exception 345 return True, True, None 346 else: 347 return True, False, None 348 349 def _on_sync(self) -> ServiceProcessingResponse: 350 if self.data_cache: 351 self.force_sync = True 352 self.make_live() 353 else: 354 # self.db_environment.sync(True) 355 self.sync_in_thread_pool() 356 357 return True, None, None 358 359 def _on_get(self, key: KeyType, db_id: DbId = None) -> ServiceProcessingResponse: 360 key = self.serializer.dumps(key) 361 362 key_info = (key, db_id) 363 if key_info in self.data_cache: 364 return True, self.data_cache[key_info], None 365 else: 366 self.read_queue.append((self.current_caller_coro_info.coro_id, key_info)) 367 self.make_live() 368 return False, None, None 369 370 def _on_get_items(self, keys: Set[Tuple[KeyType, DbId]]) -> ServiceProcessingResponse: 371 coro_id = self.current_caller_coro_info.coro_id 372 373 raw_keys: Set[Tuple[KeyType, DbId]] = set() 374 for key, db_id in keys: 375 key = self.serializer.dumps(key) 376 377 raw_keys.add((key, db_id)) 378 379 self.massive_read_queue.append((coro_id, raw_keys)) 380 self.make_live() 381 return False, None, None 382 383 def _on_get_all_items(self, db_id: DbId) -> ServiceProcessingResponse: 384 coro_id = self.current_caller_coro_info.coro_id 385 self.get_all_items_queue.append((coro_id, db_id)) 386 self.make_live() 387 return False, None, None 388 389 def _on_put(self, key: KeyType, value: Any, db_id: DbId = None) -> ServiceProcessingResponse: 390 key = self.serializer.dumps(key) 391 392 key_info = (key, db_id) 393 exception = None 394 result = None 395 try: 396 result = self.data_cache[key_info] = self.serializer.dumps(value) 397 except: 398 exception = get_exception() 399 400 self.make_live() 401 return True, result, exception 402 403 def _on_put_items(self, items: Dict[Tuple[KeyType, DbId], ValueType]) -> ServiceProcessingResponse: 404 result_items: Dict[Tuple[KeyType, DbId], Tuple[ValueType, Optional[Exception]]] = dict() 405 for key_info, value in items.items(): 406 key, db_id = key_info 407 key = self.serializer.dumps(key) 408 409 key_info = (key, db_id) 410 exception = None 411 result = None 412 try: 413 result = self.data_cache[key_info] = self.serializer.dumps(value) 414 except: 415 exception = get_exception() 416 417 result_items[key_info] = (result, exception) 418 419 self.make_live() 420 return True, result_items, None 421 422 def _on_delete(self, key: KeyType, value: Any, db_id: DbId = None) -> ServiceProcessingResponse: 423 key = self.serializer.dumps(key) 424 425 key_info = (key, db_id) 426 exception = None 427 result = None 428 try: 429 result = self.deletion_cache[key_info] = self.serializer.dumps(value) 430 except: 431 exception = get_exception() 432 433 self.make_live() 434 return True, result, exception 435 436 def _on_delete_items(self, items: Dict[Tuple[KeyType, DbId], ValueType]) -> ServiceProcessingResponse: 437 result_items: Dict[Tuple[KeyType, DbId], Tuple[ValueType, Optional[Exception]]] = dict() 438 for key_info, value in items.items(): 439 key, db_id = key_info 440 key = self.serializer.dumps(key) 441 442 key_info = (key, db_id) 443 exception = None 444 result = None 445 try: 446 result = self.deletion_cache[key_info] = self.serializer.dumps(value) 447 except: 448 exception = get_exception() 449 450 result_items[key_info] = (result, exception) 451 452 self.make_live() 453 return True, result_items, None 454 455 def _open_databases(self, db_names: Dict[DbId, DbName]) -> ServiceProcessingResponse: 456 for db_id, db_name in db_names.items(): 457 self.databases[db_id] = self.db_environment.open_db(db_name) 458 self.db_names[db_id] = db_name 459 460 self.db_environment.sync(True) 461 return True, None, None 462 463 def _drop_db(self, db_id: DbId, delete: bool = False) -> ServiceProcessingResponse: 464 coro_id = self.current_caller_coro_info.coro_id 465 self.drop_db_requests[coro_id] = (db_id, delete) 466 self.make_live() 467 return False, None, None 468 469 def sync_in_thread_pool(self): 470 async def sync_db_coro(i: Interface, self, asyncio_loop, need_to_ensure_asyncio_loop: bool): 471 if need_to_ensure_asyncio_loop: 472 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None,CoroPriority.low, True)) 473 else: 474 if asyncio_loop is None: 475 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().get()) 476 477 async def sync_db(self, asyncio_loop): 478 def sync_worker(): 479 self.db_environment.sync(True) 480 481 await task_in_thread_pool(asyncio_loop, sync_worker) 482 483 await i(AsyncioLoop, AsyncioLoopRequest().wait(sync_db(self, asyncio_loop))) 484 self.write_locked_coro_id = None 485 self.write_locked = False 486 def make_service_live_for_a_next_sync(self): 487 self.make_live() 488 489 await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self) 490 491 asyncio_loop = None 492 need_to_ensure_asyncio_loop = False 493 try: 494 asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get() 495 except AsyncioLoopWasNotSetError: 496 need_to_ensure_asyncio_loop = True 497 498 coro: CoroWrapperBase = self._loop.put_coro(sync_db_coro, self, asyncio_loop, need_to_ensure_asyncio_loop) 499 self.write_locked = True 500 self.write_locked_coro_id = coro.coro_id
Lmdb( loop: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
120 def __init__(self, loop: CoroSchedulerType): 121 super(Lmdb, self).__init__(loop) 122 self.default_db_name: DbName = b'__default__' 123 self.drop_db_requests: Dict[CoroID, Tuple[Hashable, bool]] = dict() 124 self.read_queue: List[Tuple[CoroID, Tuple[RawKeyType, DbId]]] = list() 125 self.massive_read_queue: List[Tuple[CoroID, Set[Tuple[KeyType, DbId]]]] = list() 126 self.data_cache: Dict[Tuple[Hashable, Hashable], Any] = dict() 127 self.deletion_cache: Dict[Tuple[Hashable, Hashable], Any] = dict() 128 self.get_all_items_queue: List[Tuple[CoroID, DbId]] = list() 129 self.path_to_db_environment = path_relative_to_current_dir('lmdb.db') 130 self.db_environment = None 131 self.databases: Dict[Hashable, Any] = dict() 132 self.db_names: Dict[DbId, DbName] = dict() 133 self.async_loop = None 134 self.sync_time_interval = 1.0 135 self.last_sync_time = perf_counter() 136 self.force_sync = False 137 self.write_locked = False 138 self.writes_num: int = 0 139 self.reads_num: int = 0 140 self.deletes_num: int = 0 141 self.db_drops_num: int = 0 142 self.write_locked_coro_id: Optional[CoroID] = None 143 # self.serializer = best_serializer_for_standard_data((DataFormats.binary, 144 # Tags.can_use_bytes, 145 # Tags.decode_str_as_str, 146 # Tags.decode_list_as_list, 147 # Tags.decode_bytes_as_bytes, 148 # Tags.superficial, 149 # Tags.current_platform, 150 # Tags.multi_platform), 151 # TestDataType.small, 152 # 0.1) 153 self.serializer = Serializer(Serializers.msgspec_messagepack) 154 155 self._request_workers = { 156 0: self._on_set_db_environment_path, 157 1: self._open_databases, 158 2: self._drop_db, 159 3: self._on_sync, 160 4: self._on_get, 161 5: self._on_get_items, 162 6: self._on_get_all_items, 163 7: self._on_put, 164 8: self._on_put_items, 165 9: self._on_delete, 166 10: self._on_delete_items, 167 11: self._open_db_environment, 168 }
def
get_entity_stats( self, stats_level: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin.StatsLevel = <StatsLevel.debug: 1>) -> Tuple[str, Dict[str, Any]]:
170 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 171 return type(self).__name__, { 172 'db_names': list(self.db_names.keys()), 173 'writes num': self.writes_num, 174 'reads num': self.reads_num, 175 'deletes num': self.deletes_num, 176 'db drops num': self.db_drops_num, 177 }
def
single_task_registration_or_immediate_processing( self, *args, **kwargs) -> Tuple[bool, Any, Union[BaseException, NoneType]]:
def
full_processing_iteration(self):
187 def full_processing_iteration(self): 188 self.force_sync = False 189 190 if self.db_environment is None: 191 self._init_db() 192 193 data_cache_buff = self.data_cache 194 self.data_cache = type(data_cache_buff)() 195 196 read_queue_buff = self.read_queue 197 self.read_queue = type(read_queue_buff)() 198 199 massive_read_queue_buff = self.massive_read_queue 200 self.massive_read_queue = type(massive_read_queue_buff)() 201 202 deletion_cache_buff = self.deletion_cache 203 self.deletion_cache = type(deletion_cache_buff)() 204 205 get_all_items_queue_buff = self.get_all_items_queue 206 self.get_all_items_queue = type(get_all_items_queue_buff)() 207 208 # put 209 def put_handler(db_environment, databases): 210 db_id = None 211 try: 212 with db_environment.begin(write=True) as txn: 213 for key_info, value in data_cache_buff.items(): 214 key, db_id = key_info 215 txn.put(key, value, db=databases[db_id], dupdata=False, append=False) 216 217 self.writes_num += len(data_cache_buff) 218 except lmdb_lib.MapFullError: 219 raise DBError.from_exception(db_id) 220 221 lmdb_reapplier(self.db_environment, self.databases, put_handler) 222 223 # delete 224 for key_info, value in deletion_cache_buff.items(): 225 with self.db_environment.begin(write=True) as txn: 226 key, db_id = key_info 227 txn.delete(key, value, db=self.databases[db_id]) 228 self.deletes_num += 1 229 230 # drop 231 drop_db_requests_buff = self.drop_db_requests 232 self.drop_db_requests = type(drop_db_requests_buff)() 233 dropped_databases: Set[Hashable] = set() 234 processed_coroutines: Set[CoroID] = set() 235 236 def drop_handler(db_environment, databases): 237 for coro_id, request in drop_db_requests_buff.items(): 238 if coro_id in processed_coroutines: 239 continue 240 241 db_id, delete_db = request 242 if db_id not in dropped_databases: 243 try: 244 with db_environment.begin(write=True) as txn: 245 txn.drop(db=databases[db_id], delete=delete_db) 246 247 self.db_drops_num += 1 248 except lmdb_lib.MapFullError: 249 raise DBError.from_exception(db_id) 250 251 dropped_databases.add(db_id) 252 253 self.register_response(coro_id, None, None) 254 processed_coroutines.add(coro_id) 255 256 lmdb_reapplier(self.db_environment, self.databases, drop_handler) 257 258 # get 259 def get_item(txn, key_info, data_cache_buff) -> Tuple[ValueType, Optional[Exception]]: 260 key, db_id = key_info 261 if key_info in data_cache_buff: 262 value = data_cache_buff[key_info] 263 else: 264 value = txn.get(key, db=self.databases[db_id]) 265 self.reads_num += 1 266 267 exception = None 268 try: 269 if value is None: 270 exception = DbKeyError(key_info) 271 else: 272 value = self.serializer.loads(value) 273 except: 274 exception = get_exception() 275 276 return value, exception 277 278 with self.db_environment.begin() as txn: 279 for coro_id, key_info in read_queue_buff: 280 value, exception = get_item(txn, key_info, data_cache_buff) 281 self.register_response(coro_id, value, exception) 282 283 for coro_id, set_of_key_info in massive_read_queue_buff: 284 items: Dict[Tuple[KeyType, DbId], Tuple[ValueType, Optional[Exception]]] = dict() 285 for key_info in set_of_key_info: 286 items[key_info] = get_item(txn, key_info, data_cache_buff) 287 self.register_response(coro_id, items, None) 288 289 # get all items 290 for coro_id, db_id in get_all_items_queue_buff: 291 with self.db_environment.begin(db=self.databases[db_id]) as txn: 292 result = dict() 293 exception = None 294 try: 295 # for k, v in txn.cursor(db=self.databases[db_id]): 296 # key = make_key_frozen(self.serializer.loads(k)) 297 # value = self.serializer.loads(v) 298 # result[key] = value 299 result = {make_key_frozen(self.serializer.loads(k)): self.serializer.loads(v) for k, v in txn.cursor(db=self.databases[db_id])} 300 self.reads_num += len(result) 301 except: 302 exception = get_exception() 303 304 self.register_response(coro_id, result, exception) 305 306 # sync 307 self.sync_in_thread_pool() 308 309 self.last_sync_time = perf_counter() 310 311 self.make_dead()
def
in_work(self) -> bool:
313 def in_work(self) -> bool: 314 result: bool = (bool(self.read_queue) or bool(self.massive_read_queue) or bool(self.get_all_items_queue)) or (self.force_sync or ((not bool(self.write_locked)) and (bool(self.data_cache) or bool(self.deletion_cache) or bool(self.drop_db_requests)) and ((perf_counter() - self.last_sync_time) >= self.sync_time_interval))) 315 return self.thrifty_in_work(result)
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
def
time_left_before_next_event(self) -> Tuple[bool, Union[int, float, NoneType]]:
317 def time_left_before_next_event(self) -> Tuple[bool, Optional[Union[int, float]]]: 318 time_since_last_sync_time: float = perf_counter() - self.last_sync_time 319 if self.sync_time_interval > time_since_last_sync_time: 320 return True, self.sync_time_interval - time_since_last_sync_time 321 else: 322 return True, 0
def
sync_in_thread_pool(self):
469 def sync_in_thread_pool(self): 470 async def sync_db_coro(i: Interface, self, asyncio_loop, need_to_ensure_asyncio_loop: bool): 471 if need_to_ensure_asyncio_loop: 472 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().ensure_loop(None,CoroPriority.low, True)) 473 else: 474 if asyncio_loop is None: 475 asyncio_loop = await i(AsyncioLoop, AsyncioLoopRequest().get()) 476 477 async def sync_db(self, asyncio_loop): 478 def sync_worker(): 479 self.db_environment.sync(True) 480 481 await task_in_thread_pool(asyncio_loop, sync_worker) 482 483 await i(AsyncioLoop, AsyncioLoopRequest().wait(sync_db(self, asyncio_loop))) 484 self.write_locked_coro_id = None 485 self.write_locked = False 486 def make_service_live_for_a_next_sync(self): 487 self.make_live() 488 489 await i(TimerFuncRunner, self.sync_time_interval, make_service_live_for_a_next_sync, self) 490 491 asyncio_loop = None 492 need_to_ensure_asyncio_loop = False 493 try: 494 asyncio_loop = self._loop.get_service_instance(AsyncioLoop).inline_get() 495 except AsyncioLoopWasNotSetError: 496 need_to_ensure_asyncio_loop = True 497 498 coro: CoroWrapperBase = self._loop.put_coro(sync_db_coro, self, asyncio_loop, need_to_ensure_asyncio_loop) 499 self.write_locked = True 500 self.write_locked_coro_id = coro.coro_id
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
- destroy
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
- StatsLevel
class
LmdbRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
70class LmdbRequest(ServiceRequest): 71 def set_db_environment_path(self, path_to_db_environment: str) -> ServiceRequest: 72 return self._save(0, path_to_db_environment) 73 74 def open_databases(self, db_names: Dict[DbId, DbName]) -> ServiceRequest: 75 return self._save(1, db_names) 76 77 def drop_db(self, db_id: DbId, delete: bool = False) -> ServiceRequest: 78 return self._save(2, db_id, delete) 79 80 def sync(self) -> ServiceRequest: 81 return self._save(3) 82 83 def get(self, key: KeyType, db_id: DbId = None) -> ServiceRequest: 84 return self._save(4, key, db_id) 85 86 def get_items(self, keys: Set[Tuple[KeyType, DbId]]) -> ServiceRequest: 87 return self._save(5, keys) 88 89 def get_all_items(self, db_id: DbId = None) -> ServiceRequest: 90 return self._save(6, db_id) 91 92 def put(self, key: KeyType, value: Optional[ValueType] = None, db_id: DbId = None) -> ServiceRequest: 93 return self._save(7, key, value, db_id) 94 95 def put_items(self, items: Dict[Tuple[KeyType, DbId], Optional[ValueType]]) -> ServiceRequest: 96 return self._save(8, items) 97 98 def delete(self, key: KeyType, value: Optional[ValueType] = None, db_id: DbId = None) -> ServiceRequest: 99 return self._save(9, key, value, db_id) 100 101 def delete_items(self, items: Dict[Tuple[KeyType, DbId], Optional[ValueType]]) -> ServiceRequest: 102 return self._save(10, items) 103 104 def open_db_environment(self, path_to_db_environment: str) -> ServiceRequest: 105 return self._save(11, path_to_db_environment)
def
set_db_environment_path( self, path_to_db_environment: str) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
def
open_databases( self, db_names: typing.Dict[typing.Hashable, bytes]) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
def
drop_db( self, db_id: typing.Hashable, delete: bool = False) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
def
sync( self) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
def
get( self, key: typing.Union[bytes, str, typing.Any], db_id: typing.Hashable = None) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
def
get_items( self, keys: typing.Set[typing.Tuple[typing.Union[bytes, str, typing.Any], typing.Hashable]]) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
def
get_all_items( self, db_id: typing.Hashable = None) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
def
put( self, key: typing.Union[bytes, str, typing.Any], value: typing.Union[typing.Any, NoneType] = None, db_id: typing.Hashable = None) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
def
put_items( self, items: typing.Dict[typing.Tuple[typing.Union[bytes, str, typing.Any], typing.Hashable], typing.Union[typing.Any, NoneType]]) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
def
delete( self, key: typing.Union[bytes, str, typing.Any], value: typing.Union[typing.Any, NoneType] = None, db_id: typing.Hashable = None) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
def
delete_items( self, items: typing.Dict[typing.Tuple[typing.Union[bytes, str, typing.Any], typing.Hashable], typing.Union[typing.Any, NoneType]]) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
def
open_db_environment( self, path_to_db_environment: str) -> cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest:
default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] =
<class 'Lmdb'>
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
- default__request__type__
- request_type
- args
- kwargs
- provide_to_request_handler
- interface
- i
- async_interface
- ai
KeyType =
typing.Union[bytes, str, typing.Any]
RawKeyType =
<class 'bytes'>
ValueType =
typing.Any
RawValueType =
<class 'bytes'>
DbId =
typing.Hashable
DbName =
<class 'bytes'>
class
DbKeyError(builtins.KeyError):
506class DbKeyError(KeyError): 507 def __init__(self, key_info: Tuple[KeyType, DbId], *args: object) -> None: 508 super().__init__(*args) 509 self.key_info: Tuple[KeyType, DbId] = key_info
Mapping key not found.
DbKeyError( key_info: typing.Tuple[typing.Union[bytes, str, typing.Any], typing.Hashable], *args: object)
Inherited Members
- builtins.BaseException
- with_traceback
- args