cengal.parallel_execution.coroutines.coro_standard_services.read_write_locker.versions.v_0.read_write_locker
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19""" 20Module Docstring 21Docstrings: http://www.python.org/dev/peps/pep-0257/ 22""" 23 24 25__author__ = "ButenkoMS <gtalk@butenkoms.space>" 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 28__license__ = "Apache License, Version 2.0" 29__version__ = "4.4.1" 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 31__email__ = "gtalk@butenkoms.space" 32# __status__ = "Prototype" 33__status__ = "Development" 34# __status__ = "Production" 35 36 37__all__ = [ 38 'RWOperation', 'RWLockerRequest', 'RWLockerEntity', 'RWLockerContextManager', 'FakeRWLockerContextManager', 'UnknownLockerEntity', 'RWLocker', 'get_rw_lock', 'grwl', 'aget_rw_lock', 'agrwl' 39] 40 41from cengal.parallel_execution.coroutines.coro_scheduler import * 42from cengal.parallel_execution.coroutines.coro_tools.await_coro import * 43from enum import Enum 44from typing import Dict, Hashable, Tuple, Union, Type, Optional, Any, List, Set 45from cengal.time_management.repeat_for_a_time import Tracer 46from cengal.code_flow_control.smart_values.versions.v_1 import ValueExistence 47from async_generator import asynccontextmanager, async_generator, yield_ 48import asyncio 49 50 51class RWOperation(Enum): 52 read = 0 53 write = 1 54 55 56class RWLockerRequest(ServiceRequest): 57 def register(self, entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> 'RWLockerContextManager': 58 return self._save(0, entity_id, max_writers_in_progress, max_readers_in_progress, recursive) 59 60 def deregister(self, entity_id: Hashable, safe: bool = True) -> bool: 61 return self._save(1, entity_id, safe) 62 63 def wait_for_write(self, entity_id: Hashable) -> None: 64 return self._save(2, entity_id) 65 66 def wait_for_read(self, entity_id: Hashable) -> None: 67 return self._save(3, entity_id) 68 69 70class RWLockerEntity: 71 def __init__(self, entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool, service: Type[Service]): 72 self.entity_id: Hashable = entity_id 73 self.recursive: bool = recursive 74 self.service = service 75 self.writers_pending: int = 0 76 self.writers_in_progress_dict: Dict[CoroID, int] = dict() 77 self.writers_in_progress: int = 0 78 self.max_writers_in_progress: int = max_writers_in_progress # Must be edited directly from coroutine. In order to eliminate new writers arrived during the end of the current loop iteration 79 self.readers_pending: int = 0 80 self.readers_in_progress_dict: Dict[CoroID, int] = dict() 81 self.readers_in_progress: int = 0 82 self.max_readers_in_progress: int = max_readers_in_progress # Must be edited directly from coroutine. In order to eliminate new readers arrived during the end of the current loop iteration 83 self.last_operation: RWOperation = RWOperation.read # Default is 'RWOperation.read' in order to force 'write' as a first operation among several first concurent operations 84 self.waiting_coroutines: Set[CoroID] = set() 85 self.related_coroutines: Set[CoroID] = set() 86 87 def check_writers_in_progress_boundaries(self, coro_id: Optional[CoroID] = None) -> bool: 88 if self.recursive: 89 if 0 > self.max_writers_in_progress: 90 return True 91 else: 92 if coro_id in self.writers_in_progress_dict: 93 return True 94 else: 95 return self.writers_in_progress < self.max_writers_in_progress 96 else: 97 if 0 > self.max_writers_in_progress: 98 return True 99 else: 100 return self.writers_in_progress < self.max_writers_in_progress 101 102 def increase_writers_in_progress(self, coro_id: Optional[CoroID] = None): 103 if self.recursive: 104 if coro_id not in self.writers_in_progress_dict: 105 self.writers_in_progress_dict[coro_id] = 0 106 self.writers_in_progress += 1 107 108 self.writers_in_progress_dict[coro_id] += 1 109 else: 110 self.writers_in_progress += 1 111 112 def decrease_writers_in_progress(self, coro_id: Optional[CoroID] = None): 113 if self.recursive: 114 self.writers_in_progress_dict[coro_id] -= 1 115 if 0 >= self.writers_in_progress_dict[coro_id]: 116 del self.writers_in_progress_dict[coro_id] 117 self.writers_in_progress -= 1 118 else: 119 self.writers_in_progress -= 1 120 121 def check_readers_in_progress_boundaries(self, coro_id: Optional[CoroID] = None) -> bool: 122 if self.recursive: 123 if 0 > self.max_readers_in_progress: 124 return True 125 else: 126 if coro_id in self.readers_in_progress_dict: 127 return True 128 else: 129 return self.readers_in_progress < self.max_readers_in_progress 130 else: 131 if 0 > self.max_readers_in_progress: 132 return True 133 else: 134 return self.readers_in_progress < self.max_readers_in_progress 135 136 def increase_readers_in_progress(self, coro_id: Optional[CoroID] = None): 137 if self.recursive: 138 if coro_id not in self.readers_in_progress_dict: 139 self.readers_in_progress_dict[coro_id] = 0 140 self.readers_in_progress += 1 141 142 self.readers_in_progress_dict[coro_id] += 1 143 else: 144 self.readers_in_progress += 1 145 146 def decrease_readers_in_progress(self, coro_id: Optional[CoroID] = None): 147 if self.recursive: 148 self.readers_in_progress_dict[coro_id] -= 1 149 if 0 >= self.readers_in_progress_dict[coro_id]: 150 del self.readers_in_progress_dict[coro_id] 151 self.readers_in_progress -= 1 152 else: 153 self.readers_in_progress -= 1 154 155 def try_write_lock(self, coro_id: Optional[CoroID] = None, apply: bool = True) -> bool: 156 need_to_try_later = False 157 if self.readers_in_progress or self.readers_pending: 158 if (not self.readers_in_progress) and self.readers_pending and (RWOperation.read == self.last_operation) and self.check_writers_in_progress_boundaries(coro_id): 159 if apply: 160 self.increase_writers_in_progress(coro_id) 161 self.last_operation = RWOperation.write 162 else: 163 need_to_try_later = True 164 else: 165 if self.check_writers_in_progress_boundaries(coro_id): 166 if apply: 167 self.increase_writers_in_progress(coro_id) 168 self.last_operation = RWOperation.write 169 else: 170 need_to_try_later = True 171 172 return need_to_try_later 173 174 def try_read_lock(self, coro_id: Optional[CoroID] = None, apply: bool = True) -> bool: 175 need_to_try_later = False 176 if self.writers_in_progress or self.writers_pending: 177 if (not self.writers_in_progress) and self.writers_pending and (RWOperation.write == self.last_operation) and self.check_readers_in_progress_boundaries(coro_id): 178 if apply: 179 self.increase_readers_in_progress(coro_id) 180 self.last_operation = RWOperation.read 181 else: 182 need_to_try_later = True 183 else: 184 if self.check_readers_in_progress_boundaries(coro_id): 185 if apply: 186 self.increase_readers_in_progress(coro_id) 187 self.last_operation = RWOperation.read 188 else: 189 need_to_try_later = True 190 191 return need_to_try_later 192 193 def test_remove(self) -> bool: 194 need_to_try_later = self.waiting_coroutines 195 return need_to_try_later 196 197 198class RWLockerContextManagerBase: 199 def __init__(self, core: RWLockerEntity) -> None: 200 self.core: RWLockerEntity = core 201 self.current_context_operation: Optional[RWOperation] = None 202 203 def lockable(self, operation: Optional[RWOperation] = None) -> bool: 204 if operation is None: 205 operation = RWOperation.read 206 207 if RWOperation.write == operation: 208 need_service_assistance = self.core.try_write_lock(self._interface.coro_id, False) 209 else: 210 need_service_assistance = self.core.try_read_lock(self._interface.coro_id, False) 211 212 return not need_service_assistance 213 214 def change_max_boundaries(self, max_writers_in_progress: int, max_readers_in_progress: int): 215 self.core.max_writers_in_progress = max_writers_in_progress 216 self.core.max_readers_in_progress = max_readers_in_progress 217 218 def __call__(self, operation: Optional[RWOperation] = None): 219 if operation is None: 220 operation = RWOperation.read 221 222 self.current_context_operation = operation 223 return self 224 225 226class RWLockerContextManager(RWLockerContextManagerBase): 227 def __init__(self, core: RWLockerEntity, interface: Interface): 228 super().__init__(core) 229 self._interface = interface 230 231 def __enter__(self): 232 if self.current_context_operation is None: 233 self.current_context_operation = RWOperation.read 234 235 if RWOperation.write == self.current_context_operation: 236 need_service_assistance = self.core.try_write_lock(self._interface.coro_id) 237 if need_service_assistance: 238 self.core.writers_pending += 1 239 self._interface(self.core.service, RWLockerRequest().wait_for_write(self.core.entity_id)) 240 else: 241 need_service_assistance = self.core.try_read_lock(self._interface.coro_id) 242 if need_service_assistance: 243 self.core.readers_pending += 1 244 self._interface(self.core.service, RWLockerRequest().wait_for_read(self.core.entity_id)) 245 246 return self 247 248 def __exit__(self, type, value: Exception, traceback): 249 if RWOperation.write == self.current_context_operation: 250 self.core.decrease_writers_in_progress(self._interface) 251 else: 252 self.core.decrease_readers_in_progress(self._interface) 253 254 self.current_context_operation = None 255 256 async def __aenter__(self): 257 if self.current_context_operation is None: 258 self.current_context_operation = RWOperation.read 259 260 if RWOperation.write == self.current_context_operation: 261 need_service_assistance = self.core.try_write_lock(self._interface.coro_id) 262 if need_service_assistance: 263 self.core.writers_pending += 1 264 await self._interface(self.core.service, RWLockerRequest().wait_for_write(self.core.entity_id)) 265 else: 266 need_service_assistance = self.core.try_read_lock(self._interface.coro_id) 267 if need_service_assistance: 268 self.core.readers_pending += 1 269 await self._interface(self.core.service, RWLockerRequest().wait_for_read(self.core.entity_id)) 270 271 return self 272 273 async def __aexit__(self, type, value, traceback): 274 if RWOperation.write == self.current_context_operation: 275 self.core.decrease_writers_in_progress(self._interface) 276 else: 277 self.core.decrease_readers_in_progress(self._interface) 278 279 self.current_context_operation = None 280 281 282class FakeRWLockerContextManager(RWLockerContextManagerBase): 283 def __init__(self, core: RWLockerEntity): 284 super().__init__(core) 285 286 def __enter__(self): 287 if self.current_context_operation is None: 288 self.current_context_operation = RWOperation.read 289 290 if RWOperation.write == self.current_context_operation: 291 self.core.increase_writers_in_progress() 292 self.core.last_operation = RWOperation.write 293 else: 294 self.core.increase_readers_in_progress() 295 self.core.last_operation = RWOperation.read 296 297 return self 298 299 def __exit__(self, type, value: Exception, traceback): 300 if RWOperation.write == self.current_context_operation: 301 self.core.decrease_writers_in_progress() 302 else: 303 self.core.decrease_readers_in_progress() 304 305 self.current_context_operation = None 306 307 async def __aenter__(self): 308 if self.current_context_operation is None: 309 self.current_context_operation = RWOperation.read 310 311 if RWOperation.write == self.current_context_operation: 312 self.core.increase_writers_in_progress() 313 self.core.last_operation = RWOperation.write 314 else: 315 self.core.increase_readers_in_progress() 316 self.core.last_operation = RWOperation.read 317 318 return self 319 320 async def __aexit__(self, type, value, traceback): 321 if RWOperation.write == self.current_context_operation: 322 self.core.decrease_writers_in_progress() 323 else: 324 self.core.decrease_readers_in_progress() 325 326 self.current_context_operation = None 327 328 329class UnknownLockerEntity(Exception): 330 pass 331 332 333class RWLocker(Service, EntityStatsMixin): 334 def __init__(self, loop: CoroSchedulerType): 335 super(RWLocker, self).__init__(loop) 336 337 # loop.add_global_on_coro_del_handler(self._on_coro_del_handler_global) # Todo: switch to local coro del handler 338 339 self._request_workers = { 340 0: self._on_register, 341 1: self._on_deregister, 342 2: self._on_wait_for_write, 343 3: self._on_wait_for_read, 344 } 345 346 self.locker_entities: Dict[Hashable, RWLockerEntity] = dict() 347 self.entities_by_coroutine: Dict[CoroID, Set[Hashable]] = dict() 348 349 self.remove_entity_requests: Dict[CoroID, Hashable] = dict() 350 self.waiting_for_write_requests: Dict[CoroID, Hashable] = dict() 351 self.waiting_for_read_requests: Dict[CoroID, Hashable] = dict() 352 353 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 354 return type(self).__name__, { 355 'locker entities num': len(self.locker_entities), 356 'affected coroutines num': len(self.entities_by_coroutine), 357 'waiting for write requests num': len(self.waiting_for_write_requests), 358 'waiting_for_read_requests num': len(self.waiting_for_read_requests), 359 } 360 361 def single_task_registration_or_immediate_processing(self, request: Optional[RWLockerRequest]=None 362 ) -> ServiceProcessingResponse: 363 if request is not None: 364 return self.resolve_request(request) 365 return True, None, None 366 367 def full_processing_iteration(self): 368 # entities_waiting_for_remove 369 processed_coro_ids: Set[CoroID] = set() 370 for coro_id, entity_id in self.remove_entity_requests.items(): 371 if entity_id not in self.locker_entities: 372 self.register_response(coro_id, False, None) 373 processed_coro_ids.add(coro_id) 374 continue 375 376 entity = self.locker_entities[entity_id] 377 need_to_try_later = entity.test_remove() 378 if need_to_try_later: 379 continue 380 else: 381 del self.locker_entities[entity_id] 382 processed_coro_ids.add(coro_id) 383 384 for coro_id in processed_coro_ids: 385 del self.remove_entity_requests[coro_id] 386 387 # entities_waiting_for_write 388 processed_coro_ids: Set[CoroID] = set() 389 for coro_id, entity_id in self.waiting_for_write_requests.items(): 390 if entity_id not in self.locker_entities: 391 self.register_response(coro_id, None, UnknownLockerEntity) 392 processed_coro_ids.add(coro_id) 393 continue 394 395 entity = self.locker_entities[entity_id] 396 need_to_try_later = entity.try_write_lock(coro_id) 397 if need_to_try_later: 398 continue 399 else: 400 entity.writers_pending -= 1 401 if coro_id in entity.waiting_coroutines: 402 entity.waiting_coroutines.remove(coro_id) 403 404 self.register_response(coro_id, None, None) 405 processed_coro_ids.add(coro_id) 406 407 for coro_id in processed_coro_ids: 408 del self.waiting_for_write_requests[coro_id] 409 410 # entities_waiting_for_read 411 processed_coro_ids: Set[CoroID] = set() 412 for coro_id, entity_id in self.waiting_for_read_requests.items(): 413 if entity_id not in self.locker_entities: 414 self.register_response(coro_id, None, UnknownLockerEntity) 415 processed_coro_ids.add(coro_id) 416 continue 417 418 entity = self.locker_entities[entity_id] 419 need_to_try_later = entity.try_read_lock(coro_id) 420 if need_to_try_later: 421 continue 422 else: 423 entity.readers_pending -= 1 424 if coro_id in entity.waiting_coroutines: 425 entity.waiting_coroutines.remove(coro_id) 426 427 self.register_response(coro_id, None, None) 428 processed_coro_ids.add(coro_id) 429 430 for coro_id in processed_coro_ids: 431 del self.waiting_for_read_requests[coro_id] 432 433 # general 434 if not (self.remove_entity_requests or self.waiting_for_write_requests or self.waiting_for_read_requests): 435 self.make_dead() 436 437 def in_work(self) -> bool: 438 result: bool = bool(self.remove_entity_requests) or bool(self.waiting_for_write_requests) or bool(self.waiting_for_read_requests) 439 return self.thrifty_in_work(result) 440 441 def get_locker_entity(self, entity_id: Hashable): 442 return self.locker_entities.get(entity_id) 443 444 def _on_register(self, entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> ServiceProcessingResponse: 445 if entity_id not in self.locker_entities: 446 self.locker_entities[entity_id] = RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, type(self)) 447 entity: RWLockerEntity = self.locker_entities[entity_id] 448 449 coro_id = self.current_caller_coro_info.coro.coro_id 450 entity.related_coroutines.add(coro_id) 451 if coro_id not in self.entities_by_coroutine: 452 self.entities_by_coroutine[coro_id] = set() 453 454 self.entities_by_coroutine[coro_id].add(entity_id) 455 self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler) 456 context_manager: RWLockerContextManager = RWLockerContextManager(entity, self.current_caller_coro_info.coro.interface) 457 return True, context_manager, None 458 459 def _on_deregister(self, entity_id: Hashable, safe: bool = True) -> ServiceProcessingResponse: 460 result = None 461 if safe: 462 self.remove_entity_requests[self.current_caller_coro_info.coro.coro_id] = entity_id 463 self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler) 464 self.make_live() 465 return False, None, None 466 else: 467 if entity_id in self.locker_entities: 468 entity = self.locker_entities[entity_id] 469 del self.locker_entities[entity_id] 470 for related_coro_id in entity.related_coroutines: 471 coroutine_entities = self.entities_by_coroutine[related_coro_id] 472 if entity_id in coroutine_entities: 473 coroutine_entities.remove(entity_id) 474 475 result = True 476 else: 477 result = False 478 479 return True, result, None 480 481 def _on_wait_for_write(self, entity_id: Hashable) -> ServiceProcessingResponse: 482 if entity_id not in self.locker_entities: 483 return True, None, UnknownLockerEntity() 484 485 coro_id = self.current_caller_coro_info.coro.coro_id 486 self.waiting_for_write_requests[coro_id] = entity_id 487 self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler) 488 entity = self.locker_entities[entity_id] 489 entity.waiting_coroutines.add(coro_id) 490 self.make_live() 491 return False, None, None 492 493 def _on_wait_for_read(self, entity_id: Hashable) -> ServiceProcessingResponse: 494 if entity_id not in self.locker_entities: 495 return True, None, UnknownLockerEntity() 496 497 coro_id = self.current_caller_coro_info.coro.coro_id 498 self.waiting_for_read_requests[coro_id] = entity_id 499 self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler) 500 entity = self.locker_entities[entity_id] 501 entity.waiting_coroutines.add(coro_id) 502 self.make_live() 503 return False, None, None 504 505 def _on_coro_del_handler_global(self, coro: CoroWrapperBase) -> bool: 506 coro_id = coro.coro_id 507 if coro_id in self.entities_by_coroutine: 508 entities = self.entities_by_coroutine[coro_id] 509 del self.entities_by_coroutine[coro_id] 510 for entity_id in entities: 511 if entity_id in self.locker_entities: 512 entity = self.locker_entities[entity_id] 513 if coro_id in entity.related_coroutines: 514 entity.related_coroutines.remove(coro_id) 515 516 if coro_id in entity.waiting_coroutines: 517 entity.waiting_coroutines.remove(coro_id) 518 519 if coro_id in self.remove_entity_requests: 520 del self.remove_entity_requests[coro_id] 521 522 if coro_id in self.waiting_for_write_requests: 523 del self.waiting_for_write_requests[coro_id] 524 525 if coro_id in self.waiting_for_read_requests: 526 del self.waiting_for_read_requests[coro_id] 527 528 return False 529 530 def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool: 531 return self._on_coro_del_handler_global(coro) 532 533 534RWLockerRequest.default_service_type = RWLocker 535 536 537def get_rw_lock(entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]: 538 loop = CoroScheduler.current_loop() 539 if loop is None: 540 return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker)) # running not from inside the loop 541 542 interface = loop.current_interface() 543 if interface is None: 544 return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker)) # running from Service 545 546 locker_entity = interface._loop.get_service_instance(RWLocker).get_locker_entity(entity_id) 547 if locker_entity is None: 548 lock = interface(RWLocker, RWLockerRequest().register(entity_id, max_writers_in_progress, max_readers_in_progress, recursive)) 549 else: 550 lock = RWLockerContextManager(locker_entity, interface) 551 552 return lock 553 554 555grwl = get_rw_lock 556 557 558async def aget_rw_lock(entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]: 559 loop = CoroScheduler.current_loop() 560 if loop is None: 561 return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker)) # running not from inside the loop 562 563 interface = loop.current_interface() 564 if interface is None: 565 return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker)) # running from Service 566 567 locker_entity = interface._loop.get_service_instance(RWLocker).get_locker_entity(entity_id) 568 if locker_entity is None: 569 lock = await interface(RWLocker, RWLockerRequest().register(entity_id, max_writers_in_progress, max_readers_in_progress, recursive)) 570 else: 571 lock = RWLockerContextManager(locker_entity, interface) 572 573 return lock 574 575 576agrwl = aget_rw_lock
class
RWOperation(enum.Enum):
An enumeration.
read =
<RWOperation.read: 0>
write =
<RWOperation.write: 1>
Inherited Members
- enum.Enum
- name
- value
class
RWLockerRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
57class RWLockerRequest(ServiceRequest): 58 def register(self, entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> 'RWLockerContextManager': 59 return self._save(0, entity_id, max_writers_in_progress, max_readers_in_progress, recursive) 60 61 def deregister(self, entity_id: Hashable, safe: bool = True) -> bool: 62 return self._save(1, entity_id, safe) 63 64 def wait_for_write(self, entity_id: Hashable) -> None: 65 return self._save(2, entity_id) 66 67 def wait_for_read(self, entity_id: Hashable) -> None: 68 return self._save(3, entity_id)
def
register( self, entity_id: typing.Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> RWLockerContextManager:
default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] =
<class 'RWLocker'>
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
- default__request__type__
- request_type
- args
- kwargs
- provide_to_request_handler
- interface
- i
- async_interface
- ai
class
RWLockerEntity:
71class RWLockerEntity: 72 def __init__(self, entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool, service: Type[Service]): 73 self.entity_id: Hashable = entity_id 74 self.recursive: bool = recursive 75 self.service = service 76 self.writers_pending: int = 0 77 self.writers_in_progress_dict: Dict[CoroID, int] = dict() 78 self.writers_in_progress: int = 0 79 self.max_writers_in_progress: int = max_writers_in_progress # Must be edited directly from coroutine. In order to eliminate new writers arrived during the end of the current loop iteration 80 self.readers_pending: int = 0 81 self.readers_in_progress_dict: Dict[CoroID, int] = dict() 82 self.readers_in_progress: int = 0 83 self.max_readers_in_progress: int = max_readers_in_progress # Must be edited directly from coroutine. In order to eliminate new readers arrived during the end of the current loop iteration 84 self.last_operation: RWOperation = RWOperation.read # Default is 'RWOperation.read' in order to force 'write' as a first operation among several first concurent operations 85 self.waiting_coroutines: Set[CoroID] = set() 86 self.related_coroutines: Set[CoroID] = set() 87 88 def check_writers_in_progress_boundaries(self, coro_id: Optional[CoroID] = None) -> bool: 89 if self.recursive: 90 if 0 > self.max_writers_in_progress: 91 return True 92 else: 93 if coro_id in self.writers_in_progress_dict: 94 return True 95 else: 96 return self.writers_in_progress < self.max_writers_in_progress 97 else: 98 if 0 > self.max_writers_in_progress: 99 return True 100 else: 101 return self.writers_in_progress < self.max_writers_in_progress 102 103 def increase_writers_in_progress(self, coro_id: Optional[CoroID] = None): 104 if self.recursive: 105 if coro_id not in self.writers_in_progress_dict: 106 self.writers_in_progress_dict[coro_id] = 0 107 self.writers_in_progress += 1 108 109 self.writers_in_progress_dict[coro_id] += 1 110 else: 111 self.writers_in_progress += 1 112 113 def decrease_writers_in_progress(self, coro_id: Optional[CoroID] = None): 114 if self.recursive: 115 self.writers_in_progress_dict[coro_id] -= 1 116 if 0 >= self.writers_in_progress_dict[coro_id]: 117 del self.writers_in_progress_dict[coro_id] 118 self.writers_in_progress -= 1 119 else: 120 self.writers_in_progress -= 1 121 122 def check_readers_in_progress_boundaries(self, coro_id: Optional[CoroID] = None) -> bool: 123 if self.recursive: 124 if 0 > self.max_readers_in_progress: 125 return True 126 else: 127 if coro_id in self.readers_in_progress_dict: 128 return True 129 else: 130 return self.readers_in_progress < self.max_readers_in_progress 131 else: 132 if 0 > self.max_readers_in_progress: 133 return True 134 else: 135 return self.readers_in_progress < self.max_readers_in_progress 136 137 def increase_readers_in_progress(self, coro_id: Optional[CoroID] = None): 138 if self.recursive: 139 if coro_id not in self.readers_in_progress_dict: 140 self.readers_in_progress_dict[coro_id] = 0 141 self.readers_in_progress += 1 142 143 self.readers_in_progress_dict[coro_id] += 1 144 else: 145 self.readers_in_progress += 1 146 147 def decrease_readers_in_progress(self, coro_id: Optional[CoroID] = None): 148 if self.recursive: 149 self.readers_in_progress_dict[coro_id] -= 1 150 if 0 >= self.readers_in_progress_dict[coro_id]: 151 del self.readers_in_progress_dict[coro_id] 152 self.readers_in_progress -= 1 153 else: 154 self.readers_in_progress -= 1 155 156 def try_write_lock(self, coro_id: Optional[CoroID] = None, apply: bool = True) -> bool: 157 need_to_try_later = False 158 if self.readers_in_progress or self.readers_pending: 159 if (not self.readers_in_progress) and self.readers_pending and (RWOperation.read == self.last_operation) and self.check_writers_in_progress_boundaries(coro_id): 160 if apply: 161 self.increase_writers_in_progress(coro_id) 162 self.last_operation = RWOperation.write 163 else: 164 need_to_try_later = True 165 else: 166 if self.check_writers_in_progress_boundaries(coro_id): 167 if apply: 168 self.increase_writers_in_progress(coro_id) 169 self.last_operation = RWOperation.write 170 else: 171 need_to_try_later = True 172 173 return need_to_try_later 174 175 def try_read_lock(self, coro_id: Optional[CoroID] = None, apply: bool = True) -> bool: 176 need_to_try_later = False 177 if self.writers_in_progress or self.writers_pending: 178 if (not self.writers_in_progress) and self.writers_pending and (RWOperation.write == self.last_operation) and self.check_readers_in_progress_boundaries(coro_id): 179 if apply: 180 self.increase_readers_in_progress(coro_id) 181 self.last_operation = RWOperation.read 182 else: 183 need_to_try_later = True 184 else: 185 if self.check_readers_in_progress_boundaries(coro_id): 186 if apply: 187 self.increase_readers_in_progress(coro_id) 188 self.last_operation = RWOperation.read 189 else: 190 need_to_try_later = True 191 192 return need_to_try_later 193 194 def test_remove(self) -> bool: 195 need_to_try_later = self.waiting_coroutines 196 return need_to_try_later
RWLockerEntity( entity_id: typing.Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool, service: typing.Type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service])
72 def __init__(self, entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool, service: Type[Service]): 73 self.entity_id: Hashable = entity_id 74 self.recursive: bool = recursive 75 self.service = service 76 self.writers_pending: int = 0 77 self.writers_in_progress_dict: Dict[CoroID, int] = dict() 78 self.writers_in_progress: int = 0 79 self.max_writers_in_progress: int = max_writers_in_progress # Must be edited directly from coroutine. In order to eliminate new writers arrived during the end of the current loop iteration 80 self.readers_pending: int = 0 81 self.readers_in_progress_dict: Dict[CoroID, int] = dict() 82 self.readers_in_progress: int = 0 83 self.max_readers_in_progress: int = max_readers_in_progress # Must be edited directly from coroutine. In order to eliminate new readers arrived during the end of the current loop iteration 84 self.last_operation: RWOperation = RWOperation.read # Default is 'RWOperation.read' in order to force 'write' as a first operation among several first concurent operations 85 self.waiting_coroutines: Set[CoroID] = set() 86 self.related_coroutines: Set[CoroID] = set()
last_operation: RWOperation
def
check_writers_in_progress_boundaries(self, coro_id: typing.Union[int, NoneType] = None) -> bool:
88 def check_writers_in_progress_boundaries(self, coro_id: Optional[CoroID] = None) -> bool: 89 if self.recursive: 90 if 0 > self.max_writers_in_progress: 91 return True 92 else: 93 if coro_id in self.writers_in_progress_dict: 94 return True 95 else: 96 return self.writers_in_progress < self.max_writers_in_progress 97 else: 98 if 0 > self.max_writers_in_progress: 99 return True 100 else: 101 return self.writers_in_progress < self.max_writers_in_progress
def
increase_writers_in_progress(self, coro_id: typing.Union[int, NoneType] = None):
103 def increase_writers_in_progress(self, coro_id: Optional[CoroID] = None): 104 if self.recursive: 105 if coro_id not in self.writers_in_progress_dict: 106 self.writers_in_progress_dict[coro_id] = 0 107 self.writers_in_progress += 1 108 109 self.writers_in_progress_dict[coro_id] += 1 110 else: 111 self.writers_in_progress += 1
def
decrease_writers_in_progress(self, coro_id: typing.Union[int, NoneType] = None):
113 def decrease_writers_in_progress(self, coro_id: Optional[CoroID] = None): 114 if self.recursive: 115 self.writers_in_progress_dict[coro_id] -= 1 116 if 0 >= self.writers_in_progress_dict[coro_id]: 117 del self.writers_in_progress_dict[coro_id] 118 self.writers_in_progress -= 1 119 else: 120 self.writers_in_progress -= 1
def
check_readers_in_progress_boundaries(self, coro_id: typing.Union[int, NoneType] = None) -> bool:
122 def check_readers_in_progress_boundaries(self, coro_id: Optional[CoroID] = None) -> bool: 123 if self.recursive: 124 if 0 > self.max_readers_in_progress: 125 return True 126 else: 127 if coro_id in self.readers_in_progress_dict: 128 return True 129 else: 130 return self.readers_in_progress < self.max_readers_in_progress 131 else: 132 if 0 > self.max_readers_in_progress: 133 return True 134 else: 135 return self.readers_in_progress < self.max_readers_in_progress
def
increase_readers_in_progress(self, coro_id: typing.Union[int, NoneType] = None):
137 def increase_readers_in_progress(self, coro_id: Optional[CoroID] = None): 138 if self.recursive: 139 if coro_id not in self.readers_in_progress_dict: 140 self.readers_in_progress_dict[coro_id] = 0 141 self.readers_in_progress += 1 142 143 self.readers_in_progress_dict[coro_id] += 1 144 else: 145 self.readers_in_progress += 1
def
decrease_readers_in_progress(self, coro_id: typing.Union[int, NoneType] = None):
147 def decrease_readers_in_progress(self, coro_id: Optional[CoroID] = None): 148 if self.recursive: 149 self.readers_in_progress_dict[coro_id] -= 1 150 if 0 >= self.readers_in_progress_dict[coro_id]: 151 del self.readers_in_progress_dict[coro_id] 152 self.readers_in_progress -= 1 153 else: 154 self.readers_in_progress -= 1
def
try_write_lock( self, coro_id: typing.Union[int, NoneType] = None, apply: bool = True) -> bool:
156 def try_write_lock(self, coro_id: Optional[CoroID] = None, apply: bool = True) -> bool: 157 need_to_try_later = False 158 if self.readers_in_progress or self.readers_pending: 159 if (not self.readers_in_progress) and self.readers_pending and (RWOperation.read == self.last_operation) and self.check_writers_in_progress_boundaries(coro_id): 160 if apply: 161 self.increase_writers_in_progress(coro_id) 162 self.last_operation = RWOperation.write 163 else: 164 need_to_try_later = True 165 else: 166 if self.check_writers_in_progress_boundaries(coro_id): 167 if apply: 168 self.increase_writers_in_progress(coro_id) 169 self.last_operation = RWOperation.write 170 else: 171 need_to_try_later = True 172 173 return need_to_try_later
def
try_read_lock( self, coro_id: typing.Union[int, NoneType] = None, apply: bool = True) -> bool:
175 def try_read_lock(self, coro_id: Optional[CoroID] = None, apply: bool = True) -> bool: 176 need_to_try_later = False 177 if self.writers_in_progress or self.writers_pending: 178 if (not self.writers_in_progress) and self.writers_pending and (RWOperation.write == self.last_operation) and self.check_readers_in_progress_boundaries(coro_id): 179 if apply: 180 self.increase_readers_in_progress(coro_id) 181 self.last_operation = RWOperation.read 182 else: 183 need_to_try_later = True 184 else: 185 if self.check_readers_in_progress_boundaries(coro_id): 186 if apply: 187 self.increase_readers_in_progress(coro_id) 188 self.last_operation = RWOperation.read 189 else: 190 need_to_try_later = True 191 192 return need_to_try_later
227class RWLockerContextManager(RWLockerContextManagerBase): 228 def __init__(self, core: RWLockerEntity, interface: Interface): 229 super().__init__(core) 230 self._interface = interface 231 232 def __enter__(self): 233 if self.current_context_operation is None: 234 self.current_context_operation = RWOperation.read 235 236 if RWOperation.write == self.current_context_operation: 237 need_service_assistance = self.core.try_write_lock(self._interface.coro_id) 238 if need_service_assistance: 239 self.core.writers_pending += 1 240 self._interface(self.core.service, RWLockerRequest().wait_for_write(self.core.entity_id)) 241 else: 242 need_service_assistance = self.core.try_read_lock(self._interface.coro_id) 243 if need_service_assistance: 244 self.core.readers_pending += 1 245 self._interface(self.core.service, RWLockerRequest().wait_for_read(self.core.entity_id)) 246 247 return self 248 249 def __exit__(self, type, value: Exception, traceback): 250 if RWOperation.write == self.current_context_operation: 251 self.core.decrease_writers_in_progress(self._interface) 252 else: 253 self.core.decrease_readers_in_progress(self._interface) 254 255 self.current_context_operation = None 256 257 async def __aenter__(self): 258 if self.current_context_operation is None: 259 self.current_context_operation = RWOperation.read 260 261 if RWOperation.write == self.current_context_operation: 262 need_service_assistance = self.core.try_write_lock(self._interface.coro_id) 263 if need_service_assistance: 264 self.core.writers_pending += 1 265 await self._interface(self.core.service, RWLockerRequest().wait_for_write(self.core.entity_id)) 266 else: 267 need_service_assistance = self.core.try_read_lock(self._interface.coro_id) 268 if need_service_assistance: 269 self.core.readers_pending += 1 270 await self._interface(self.core.service, RWLockerRequest().wait_for_read(self.core.entity_id)) 271 272 return self 273 274 async def __aexit__(self, type, value, traceback): 275 if RWOperation.write == self.current_context_operation: 276 self.core.decrease_writers_in_progress(self._interface) 277 else: 278 self.core.decrease_readers_in_progress(self._interface) 279 280 self.current_context_operation = None
RWLockerContextManager( core: RWLockerEntity, interface: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface)
Inherited Members
283class FakeRWLockerContextManager(RWLockerContextManagerBase): 284 def __init__(self, core: RWLockerEntity): 285 super().__init__(core) 286 287 def __enter__(self): 288 if self.current_context_operation is None: 289 self.current_context_operation = RWOperation.read 290 291 if RWOperation.write == self.current_context_operation: 292 self.core.increase_writers_in_progress() 293 self.core.last_operation = RWOperation.write 294 else: 295 self.core.increase_readers_in_progress() 296 self.core.last_operation = RWOperation.read 297 298 return self 299 300 def __exit__(self, type, value: Exception, traceback): 301 if RWOperation.write == self.current_context_operation: 302 self.core.decrease_writers_in_progress() 303 else: 304 self.core.decrease_readers_in_progress() 305 306 self.current_context_operation = None 307 308 async def __aenter__(self): 309 if self.current_context_operation is None: 310 self.current_context_operation = RWOperation.read 311 312 if RWOperation.write == self.current_context_operation: 313 self.core.increase_writers_in_progress() 314 self.core.last_operation = RWOperation.write 315 else: 316 self.core.increase_readers_in_progress() 317 self.core.last_operation = RWOperation.read 318 319 return self 320 321 async def __aexit__(self, type, value, traceback): 322 if RWOperation.write == self.current_context_operation: 323 self.core.decrease_writers_in_progress() 324 else: 325 self.core.decrease_readers_in_progress() 326 327 self.current_context_operation = None
FakeRWLockerContextManager( core: RWLockerEntity)
Inherited Members
class
UnknownLockerEntity(builtins.Exception):
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
class
RWLocker(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin):
334class RWLocker(Service, EntityStatsMixin): 335 def __init__(self, loop: CoroSchedulerType): 336 super(RWLocker, self).__init__(loop) 337 338 # loop.add_global_on_coro_del_handler(self._on_coro_del_handler_global) # Todo: switch to local coro del handler 339 340 self._request_workers = { 341 0: self._on_register, 342 1: self._on_deregister, 343 2: self._on_wait_for_write, 344 3: self._on_wait_for_read, 345 } 346 347 self.locker_entities: Dict[Hashable, RWLockerEntity] = dict() 348 self.entities_by_coroutine: Dict[CoroID, Set[Hashable]] = dict() 349 350 self.remove_entity_requests: Dict[CoroID, Hashable] = dict() 351 self.waiting_for_write_requests: Dict[CoroID, Hashable] = dict() 352 self.waiting_for_read_requests: Dict[CoroID, Hashable] = dict() 353 354 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 355 return type(self).__name__, { 356 'locker entities num': len(self.locker_entities), 357 'affected coroutines num': len(self.entities_by_coroutine), 358 'waiting for write requests num': len(self.waiting_for_write_requests), 359 'waiting_for_read_requests num': len(self.waiting_for_read_requests), 360 } 361 362 def single_task_registration_or_immediate_processing(self, request: Optional[RWLockerRequest]=None 363 ) -> ServiceProcessingResponse: 364 if request is not None: 365 return self.resolve_request(request) 366 return True, None, None 367 368 def full_processing_iteration(self): 369 # entities_waiting_for_remove 370 processed_coro_ids: Set[CoroID] = set() 371 for coro_id, entity_id in self.remove_entity_requests.items(): 372 if entity_id not in self.locker_entities: 373 self.register_response(coro_id, False, None) 374 processed_coro_ids.add(coro_id) 375 continue 376 377 entity = self.locker_entities[entity_id] 378 need_to_try_later = entity.test_remove() 379 if need_to_try_later: 380 continue 381 else: 382 del self.locker_entities[entity_id] 383 processed_coro_ids.add(coro_id) 384 385 for coro_id in processed_coro_ids: 386 del self.remove_entity_requests[coro_id] 387 388 # entities_waiting_for_write 389 processed_coro_ids: Set[CoroID] = set() 390 for coro_id, entity_id in self.waiting_for_write_requests.items(): 391 if entity_id not in self.locker_entities: 392 self.register_response(coro_id, None, UnknownLockerEntity) 393 processed_coro_ids.add(coro_id) 394 continue 395 396 entity = self.locker_entities[entity_id] 397 need_to_try_later = entity.try_write_lock(coro_id) 398 if need_to_try_later: 399 continue 400 else: 401 entity.writers_pending -= 1 402 if coro_id in entity.waiting_coroutines: 403 entity.waiting_coroutines.remove(coro_id) 404 405 self.register_response(coro_id, None, None) 406 processed_coro_ids.add(coro_id) 407 408 for coro_id in processed_coro_ids: 409 del self.waiting_for_write_requests[coro_id] 410 411 # entities_waiting_for_read 412 processed_coro_ids: Set[CoroID] = set() 413 for coro_id, entity_id in self.waiting_for_read_requests.items(): 414 if entity_id not in self.locker_entities: 415 self.register_response(coro_id, None, UnknownLockerEntity) 416 processed_coro_ids.add(coro_id) 417 continue 418 419 entity = self.locker_entities[entity_id] 420 need_to_try_later = entity.try_read_lock(coro_id) 421 if need_to_try_later: 422 continue 423 else: 424 entity.readers_pending -= 1 425 if coro_id in entity.waiting_coroutines: 426 entity.waiting_coroutines.remove(coro_id) 427 428 self.register_response(coro_id, None, None) 429 processed_coro_ids.add(coro_id) 430 431 for coro_id in processed_coro_ids: 432 del self.waiting_for_read_requests[coro_id] 433 434 # general 435 if not (self.remove_entity_requests or self.waiting_for_write_requests or self.waiting_for_read_requests): 436 self.make_dead() 437 438 def in_work(self) -> bool: 439 result: bool = bool(self.remove_entity_requests) or bool(self.waiting_for_write_requests) or bool(self.waiting_for_read_requests) 440 return self.thrifty_in_work(result) 441 442 def get_locker_entity(self, entity_id: Hashable): 443 return self.locker_entities.get(entity_id) 444 445 def _on_register(self, entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> ServiceProcessingResponse: 446 if entity_id not in self.locker_entities: 447 self.locker_entities[entity_id] = RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, type(self)) 448 entity: RWLockerEntity = self.locker_entities[entity_id] 449 450 coro_id = self.current_caller_coro_info.coro.coro_id 451 entity.related_coroutines.add(coro_id) 452 if coro_id not in self.entities_by_coroutine: 453 self.entities_by_coroutine[coro_id] = set() 454 455 self.entities_by_coroutine[coro_id].add(entity_id) 456 self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler) 457 context_manager: RWLockerContextManager = RWLockerContextManager(entity, self.current_caller_coro_info.coro.interface) 458 return True, context_manager, None 459 460 def _on_deregister(self, entity_id: Hashable, safe: bool = True) -> ServiceProcessingResponse: 461 result = None 462 if safe: 463 self.remove_entity_requests[self.current_caller_coro_info.coro.coro_id] = entity_id 464 self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler) 465 self.make_live() 466 return False, None, None 467 else: 468 if entity_id in self.locker_entities: 469 entity = self.locker_entities[entity_id] 470 del self.locker_entities[entity_id] 471 for related_coro_id in entity.related_coroutines: 472 coroutine_entities = self.entities_by_coroutine[related_coro_id] 473 if entity_id in coroutine_entities: 474 coroutine_entities.remove(entity_id) 475 476 result = True 477 else: 478 result = False 479 480 return True, result, None 481 482 def _on_wait_for_write(self, entity_id: Hashable) -> ServiceProcessingResponse: 483 if entity_id not in self.locker_entities: 484 return True, None, UnknownLockerEntity() 485 486 coro_id = self.current_caller_coro_info.coro.coro_id 487 self.waiting_for_write_requests[coro_id] = entity_id 488 self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler) 489 entity = self.locker_entities[entity_id] 490 entity.waiting_coroutines.add(coro_id) 491 self.make_live() 492 return False, None, None 493 494 def _on_wait_for_read(self, entity_id: Hashable) -> ServiceProcessingResponse: 495 if entity_id not in self.locker_entities: 496 return True, None, UnknownLockerEntity() 497 498 coro_id = self.current_caller_coro_info.coro.coro_id 499 self.waiting_for_read_requests[coro_id] = entity_id 500 self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler) 501 entity = self.locker_entities[entity_id] 502 entity.waiting_coroutines.add(coro_id) 503 self.make_live() 504 return False, None, None 505 506 def _on_coro_del_handler_global(self, coro: CoroWrapperBase) -> bool: 507 coro_id = coro.coro_id 508 if coro_id in self.entities_by_coroutine: 509 entities = self.entities_by_coroutine[coro_id] 510 del self.entities_by_coroutine[coro_id] 511 for entity_id in entities: 512 if entity_id in self.locker_entities: 513 entity = self.locker_entities[entity_id] 514 if coro_id in entity.related_coroutines: 515 entity.related_coroutines.remove(coro_id) 516 517 if coro_id in entity.waiting_coroutines: 518 entity.waiting_coroutines.remove(coro_id) 519 520 if coro_id in self.remove_entity_requests: 521 del self.remove_entity_requests[coro_id] 522 523 if coro_id in self.waiting_for_write_requests: 524 del self.waiting_for_write_requests[coro_id] 525 526 if coro_id in self.waiting_for_read_requests: 527 del self.waiting_for_read_requests[coro_id] 528 529 return False 530 531 def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool: 532 return self._on_coro_del_handler_global(coro)
RWLocker( loop: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
335 def __init__(self, loop: CoroSchedulerType): 336 super(RWLocker, self).__init__(loop) 337 338 # loop.add_global_on_coro_del_handler(self._on_coro_del_handler_global) # Todo: switch to local coro del handler 339 340 self._request_workers = { 341 0: self._on_register, 342 1: self._on_deregister, 343 2: self._on_wait_for_write, 344 3: self._on_wait_for_read, 345 } 346 347 self.locker_entities: Dict[Hashable, RWLockerEntity] = dict() 348 self.entities_by_coroutine: Dict[CoroID, Set[Hashable]] = dict() 349 350 self.remove_entity_requests: Dict[CoroID, Hashable] = dict() 351 self.waiting_for_write_requests: Dict[CoroID, Hashable] = dict() 352 self.waiting_for_read_requests: Dict[CoroID, Hashable] = dict()
locker_entities: Dict[Hashable, RWLockerEntity]
def
get_entity_stats( self, stats_level: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin.StatsLevel = <StatsLevel.debug: 1>) -> Tuple[str, Dict[str, Any]]:
354 def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]: 355 return type(self).__name__, { 356 'locker entities num': len(self.locker_entities), 357 'affected coroutines num': len(self.entities_by_coroutine), 358 'waiting for write requests num': len(self.waiting_for_write_requests), 359 'waiting_for_read_requests num': len(self.waiting_for_read_requests), 360 }
def
single_task_registration_or_immediate_processing( self, request: typing.Union[RWLockerRequest, NoneType] = None) -> Tuple[bool, Any, Union[BaseException, NoneType]]:
def
full_processing_iteration(self):
368 def full_processing_iteration(self): 369 # entities_waiting_for_remove 370 processed_coro_ids: Set[CoroID] = set() 371 for coro_id, entity_id in self.remove_entity_requests.items(): 372 if entity_id not in self.locker_entities: 373 self.register_response(coro_id, False, None) 374 processed_coro_ids.add(coro_id) 375 continue 376 377 entity = self.locker_entities[entity_id] 378 need_to_try_later = entity.test_remove() 379 if need_to_try_later: 380 continue 381 else: 382 del self.locker_entities[entity_id] 383 processed_coro_ids.add(coro_id) 384 385 for coro_id in processed_coro_ids: 386 del self.remove_entity_requests[coro_id] 387 388 # entities_waiting_for_write 389 processed_coro_ids: Set[CoroID] = set() 390 for coro_id, entity_id in self.waiting_for_write_requests.items(): 391 if entity_id not in self.locker_entities: 392 self.register_response(coro_id, None, UnknownLockerEntity) 393 processed_coro_ids.add(coro_id) 394 continue 395 396 entity = self.locker_entities[entity_id] 397 need_to_try_later = entity.try_write_lock(coro_id) 398 if need_to_try_later: 399 continue 400 else: 401 entity.writers_pending -= 1 402 if coro_id in entity.waiting_coroutines: 403 entity.waiting_coroutines.remove(coro_id) 404 405 self.register_response(coro_id, None, None) 406 processed_coro_ids.add(coro_id) 407 408 for coro_id in processed_coro_ids: 409 del self.waiting_for_write_requests[coro_id] 410 411 # entities_waiting_for_read 412 processed_coro_ids: Set[CoroID] = set() 413 for coro_id, entity_id in self.waiting_for_read_requests.items(): 414 if entity_id not in self.locker_entities: 415 self.register_response(coro_id, None, UnknownLockerEntity) 416 processed_coro_ids.add(coro_id) 417 continue 418 419 entity = self.locker_entities[entity_id] 420 need_to_try_later = entity.try_read_lock(coro_id) 421 if need_to_try_later: 422 continue 423 else: 424 entity.readers_pending -= 1 425 if coro_id in entity.waiting_coroutines: 426 entity.waiting_coroutines.remove(coro_id) 427 428 self.register_response(coro_id, None, None) 429 processed_coro_ids.add(coro_id) 430 431 for coro_id in processed_coro_ids: 432 del self.waiting_for_read_requests[coro_id] 433 434 # general 435 if not (self.remove_entity_requests or self.waiting_for_write_requests or self.waiting_for_read_requests): 436 self.make_dead()
def
in_work(self) -> bool:
438 def in_work(self) -> bool: 439 result: bool = bool(self.remove_entity_requests) or bool(self.waiting_for_write_requests) or bool(self.waiting_for_read_requests) 440 return self.thrifty_in_work(result)
Will be executed twice per iteration: once before and once after the full_processing_iteration() execution
Raises: NotImplementedError: _description_
Returns: bool: _description_
Inherited Members
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
- current_caller_coro_info
- iteration
- make_response
- register_response
- put_task
- resolve_request
- try_resolve_request
- in_forground_work
- thrifty_in_work
- time_left_before_next_event
- is_low_latency
- make_live
- make_dead
- service_id_impl
- service_id
- destroy
- cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
- StatsLevel
def
get_rw_lock( entity_id: typing.Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]:
538def get_rw_lock(entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]: 539 loop = CoroScheduler.current_loop() 540 if loop is None: 541 return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker)) # running not from inside the loop 542 543 interface = loop.current_interface() 544 if interface is None: 545 return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker)) # running from Service 546 547 locker_entity = interface._loop.get_service_instance(RWLocker).get_locker_entity(entity_id) 548 if locker_entity is None: 549 lock = interface(RWLocker, RWLockerRequest().register(entity_id, max_writers_in_progress, max_readers_in_progress, recursive)) 550 else: 551 lock = RWLockerContextManager(locker_entity, interface) 552 553 return lock
def
grwl( entity_id: typing.Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]:
538def get_rw_lock(entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]: 539 loop = CoroScheduler.current_loop() 540 if loop is None: 541 return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker)) # running not from inside the loop 542 543 interface = loop.current_interface() 544 if interface is None: 545 return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker)) # running from Service 546 547 locker_entity = interface._loop.get_service_instance(RWLocker).get_locker_entity(entity_id) 548 if locker_entity is None: 549 lock = interface(RWLocker, RWLockerRequest().register(entity_id, max_writers_in_progress, max_readers_in_progress, recursive)) 550 else: 551 lock = RWLockerContextManager(locker_entity, interface) 552 553 return lock
async def
aget_rw_lock( entity_id: typing.Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]:
559async def aget_rw_lock(entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]: 560 loop = CoroScheduler.current_loop() 561 if loop is None: 562 return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker)) # running not from inside the loop 563 564 interface = loop.current_interface() 565 if interface is None: 566 return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker)) # running from Service 567 568 locker_entity = interface._loop.get_service_instance(RWLocker).get_locker_entity(entity_id) 569 if locker_entity is None: 570 lock = await interface(RWLocker, RWLockerRequest().register(entity_id, max_writers_in_progress, max_readers_in_progress, recursive)) 571 else: 572 lock = RWLockerContextManager(locker_entity, interface) 573 574 return lock
async def
agrwl( entity_id: typing.Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]:
559async def aget_rw_lock(entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]: 560 loop = CoroScheduler.current_loop() 561 if loop is None: 562 return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker)) # running not from inside the loop 563 564 interface = loop.current_interface() 565 if interface is None: 566 return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker)) # running from Service 567 568 locker_entity = interface._loop.get_service_instance(RWLocker).get_locker_entity(entity_id) 569 if locker_entity is None: 570 lock = await interface(RWLocker, RWLockerRequest().register(entity_id, max_writers_in_progress, max_readers_in_progress, recursive)) 571 else: 572 lock = RWLockerContextManager(locker_entity, interface) 573 574 return lock