cengal.parallel_execution.coroutines.coro_standard_services.read_write_locker.versions.v_0.read_write_locker

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19"""
 20Module Docstring
 21Docstrings: http://www.python.org/dev/peps/pep-0257/
 22"""
 23
 24
 25__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 28__license__ = "Apache License, Version 2.0"
 29__version__ = "4.4.1"
 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 31__email__ = "gtalk@butenkoms.space"
 32# __status__ = "Prototype"
 33__status__ = "Development"
 34# __status__ = "Production"
 35
 36
 37__all__ = [
 38    'RWOperation', 'RWLockerRequest', 'RWLockerEntity', 'RWLockerContextManager', 'FakeRWLockerContextManager', 'UnknownLockerEntity', 'RWLocker', 'get_rw_lock', 'grwl', 'aget_rw_lock', 'agrwl'
 39]
 40
 41from cengal.parallel_execution.coroutines.coro_scheduler import *
 42from cengal.parallel_execution.coroutines.coro_tools.await_coro import *
 43from enum import Enum
 44from typing import Dict, Hashable, Tuple, Union, Type, Optional, Any, List, Set
 45from cengal.time_management.repeat_for_a_time import Tracer
 46from cengal.code_flow_control.smart_values.versions.v_1 import ValueExistence
 47from async_generator import asynccontextmanager, async_generator, yield_
 48import asyncio
 49
 50
 51class RWOperation(Enum):
 52    read = 0
 53    write = 1
 54
 55
 56class RWLockerRequest(ServiceRequest):
 57    def register(self, entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> 'RWLockerContextManager':
 58        return self._save(0, entity_id, max_writers_in_progress, max_readers_in_progress, recursive)
 59
 60    def deregister(self, entity_id: Hashable, safe: bool = True) -> bool:
 61        return self._save(1, entity_id, safe)
 62    
 63    def wait_for_write(self, entity_id: Hashable) -> None:
 64        return self._save(2, entity_id)
 65    
 66    def wait_for_read(self, entity_id: Hashable) -> None:
 67        return self._save(3, entity_id)
 68
 69
 70class RWLockerEntity:
 71    def __init__(self, entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool, service: Type[Service]):
 72        self.entity_id: Hashable = entity_id
 73        self.recursive: bool = recursive
 74        self.service = service
 75        self.writers_pending: int = 0
 76        self.writers_in_progress_dict: Dict[CoroID, int] = dict()
 77        self.writers_in_progress: int = 0
 78        self.max_writers_in_progress: int = max_writers_in_progress  # Must be edited directly from coroutine. In order to eliminate new writers arrived during the end of the current loop iteration
 79        self.readers_pending: int = 0
 80        self.readers_in_progress_dict: Dict[CoroID, int] = dict()
 81        self.readers_in_progress: int = 0
 82        self.max_readers_in_progress: int = max_readers_in_progress  # Must be edited directly from coroutine. In order to eliminate new readers arrived during the end of the current loop iteration
 83        self.last_operation: RWOperation = RWOperation.read  # Default is 'RWOperation.read' in order to force 'write' as a first operation among several first concurent operations
 84        self.waiting_coroutines: Set[CoroID] = set()
 85        self.related_coroutines: Set[CoroID] = set()
 86
 87    def check_writers_in_progress_boundaries(self, coro_id: Optional[CoroID] = None) -> bool:
 88        if self.recursive:
 89            if 0 > self.max_writers_in_progress:
 90                return True
 91            else:
 92                if coro_id in self.writers_in_progress_dict:
 93                    return True
 94                else:
 95                    return self.writers_in_progress < self.max_writers_in_progress
 96        else:
 97            if 0 > self.max_writers_in_progress:
 98                return True
 99            else:
100                return self.writers_in_progress < self.max_writers_in_progress
101    
102    def increase_writers_in_progress(self, coro_id: Optional[CoroID] = None):
103        if self.recursive:
104            if coro_id not in self.writers_in_progress_dict:
105                self.writers_in_progress_dict[coro_id] = 0
106                self.writers_in_progress += 1
107            
108            self.writers_in_progress_dict[coro_id] += 1
109        else:
110            self.writers_in_progress += 1
111    
112    def decrease_writers_in_progress(self, coro_id: Optional[CoroID] = None):
113        if self.recursive:
114            self.writers_in_progress_dict[coro_id] -= 1
115            if 0 >= self.writers_in_progress_dict[coro_id]:
116                del self.writers_in_progress_dict[coro_id]
117                self.writers_in_progress -= 1
118        else:
119            self.writers_in_progress -= 1
120
121    def check_readers_in_progress_boundaries(self, coro_id: Optional[CoroID] = None) -> bool:
122        if self.recursive:
123            if 0 > self.max_readers_in_progress:
124                return True
125            else:
126                if coro_id in self.readers_in_progress_dict:
127                    return True
128                else:
129                    return self.readers_in_progress < self.max_readers_in_progress
130        else:
131            if 0 > self.max_readers_in_progress:
132                return True
133            else:
134                return self.readers_in_progress < self.max_readers_in_progress
135    
136    def increase_readers_in_progress(self, coro_id: Optional[CoroID] = None):
137        if self.recursive:
138            if coro_id not in self.readers_in_progress_dict:
139                self.readers_in_progress_dict[coro_id] = 0
140                self.readers_in_progress += 1
141            
142            self.readers_in_progress_dict[coro_id] += 1
143        else:
144            self.readers_in_progress += 1
145    
146    def decrease_readers_in_progress(self, coro_id: Optional[CoroID] = None):
147        if self.recursive:
148            self.readers_in_progress_dict[coro_id] -= 1
149            if 0 >= self.readers_in_progress_dict[coro_id]:
150                del self.readers_in_progress_dict[coro_id]
151                self.readers_in_progress -= 1
152        else:
153            self.readers_in_progress -= 1
154
155    def try_write_lock(self, coro_id: Optional[CoroID] = None, apply: bool = True) -> bool:
156        need_to_try_later = False
157        if self.readers_in_progress or self.readers_pending:
158            if (not self.readers_in_progress) and self.readers_pending and (RWOperation.read == self.last_operation) and self.check_writers_in_progress_boundaries(coro_id):
159                if apply:
160                    self.increase_writers_in_progress(coro_id)
161                    self.last_operation = RWOperation.write
162            else:
163                need_to_try_later = True
164        else:
165            if self.check_writers_in_progress_boundaries(coro_id):
166                if apply:
167                    self.increase_writers_in_progress(coro_id)
168                    self.last_operation = RWOperation.write
169            else:
170                need_to_try_later = True
171        
172        return need_to_try_later
173
174    def try_read_lock(self, coro_id: Optional[CoroID] = None, apply: bool = True) -> bool:
175        need_to_try_later = False
176        if self.writers_in_progress or self.writers_pending:
177            if (not self.writers_in_progress) and self.writers_pending and (RWOperation.write == self.last_operation) and self.check_readers_in_progress_boundaries(coro_id):
178                if apply:
179                    self.increase_readers_in_progress(coro_id)
180                    self.last_operation = RWOperation.read
181            else:
182                need_to_try_later = True
183        else:
184            if self.check_readers_in_progress_boundaries(coro_id):
185                if apply:
186                    self.increase_readers_in_progress(coro_id)
187                    self.last_operation = RWOperation.read
188            else:
189                need_to_try_later = True
190        
191        return need_to_try_later
192    
193    def test_remove(self) -> bool:
194        need_to_try_later = self.waiting_coroutines
195        return need_to_try_later
196
197
198class RWLockerContextManagerBase:
199    def __init__(self, core: RWLockerEntity) -> None:
200        self.core: RWLockerEntity = core
201        self.current_context_operation: Optional[RWOperation] = None
202    
203    def lockable(self, operation: Optional[RWOperation] = None) -> bool:
204        if operation is None:
205            operation = RWOperation.read
206        
207        if RWOperation.write == operation:
208            need_service_assistance = self.core.try_write_lock(self._interface.coro_id, False)
209        else:
210            need_service_assistance = self.core.try_read_lock(self._interface.coro_id, False)
211        
212        return not need_service_assistance
213    
214    def change_max_boundaries(self, max_writers_in_progress: int, max_readers_in_progress: int):
215        self.core.max_writers_in_progress = max_writers_in_progress
216        self.core.max_readers_in_progress = max_readers_in_progress
217        
218    def __call__(self, operation: Optional[RWOperation] = None):
219        if operation is None:
220            operation = RWOperation.read
221        
222        self.current_context_operation = operation
223        return self
224    
225
226class RWLockerContextManager(RWLockerContextManagerBase):
227    def __init__(self, core: RWLockerEntity, interface: Interface):
228        super().__init__(core)
229        self._interface = interface
230    
231    def __enter__(self):
232        if self.current_context_operation is None:
233            self.current_context_operation = RWOperation.read
234        
235        if RWOperation.write == self.current_context_operation:
236            need_service_assistance = self.core.try_write_lock(self._interface.coro_id)
237            if need_service_assistance:
238                self.core.writers_pending += 1
239                self._interface(self.core.service, RWLockerRequest().wait_for_write(self.core.entity_id))
240        else:
241            need_service_assistance = self.core.try_read_lock(self._interface.coro_id)
242            if need_service_assistance:
243                self.core.readers_pending += 1
244                self._interface(self.core.service, RWLockerRequest().wait_for_read(self.core.entity_id))
245        
246        return self
247    
248    def __exit__(self, type, value: Exception, traceback):
249        if RWOperation.write == self.current_context_operation:
250            self.core.decrease_writers_in_progress(self._interface)
251        else:
252            self.core.decrease_readers_in_progress(self._interface)
253        
254        self.current_context_operation = None
255
256    async def __aenter__(self):
257        if self.current_context_operation is None:
258            self.current_context_operation = RWOperation.read
259        
260        if RWOperation.write == self.current_context_operation:
261            need_service_assistance = self.core.try_write_lock(self._interface.coro_id)
262            if need_service_assistance:
263                self.core.writers_pending += 1
264                await self._interface(self.core.service, RWLockerRequest().wait_for_write(self.core.entity_id))
265        else:
266            need_service_assistance = self.core.try_read_lock(self._interface.coro_id)
267            if need_service_assistance:
268                self.core.readers_pending += 1
269                await self._interface(self.core.service, RWLockerRequest().wait_for_read(self.core.entity_id))        
270        
271        return self
272
273    async def __aexit__(self, type, value, traceback):
274        if RWOperation.write == self.current_context_operation:
275            self.core.decrease_writers_in_progress(self._interface)
276        else:
277            self.core.decrease_readers_in_progress(self._interface)
278        
279        self.current_context_operation = None
280
281
282class FakeRWLockerContextManager(RWLockerContextManagerBase):
283    def __init__(self, core: RWLockerEntity):
284        super().__init__(core)
285    
286    def __enter__(self):
287        if self.current_context_operation is None:
288            self.current_context_operation = RWOperation.read
289        
290        if RWOperation.write == self.current_context_operation:
291            self.core.increase_writers_in_progress()
292            self.core.last_operation = RWOperation.write
293        else:
294            self.core.increase_readers_in_progress()
295            self.core.last_operation = RWOperation.read
296        
297        return self
298    
299    def __exit__(self, type, value: Exception, traceback):
300        if RWOperation.write == self.current_context_operation:
301            self.core.decrease_writers_in_progress()
302        else:
303            self.core.decrease_readers_in_progress()
304        
305        self.current_context_operation = None
306
307    async def __aenter__(self):
308        if self.current_context_operation is None:
309            self.current_context_operation = RWOperation.read
310        
311        if RWOperation.write == self.current_context_operation:
312            self.core.increase_writers_in_progress()
313            self.core.last_operation = RWOperation.write
314        else:
315            self.core.increase_readers_in_progress()
316            self.core.last_operation = RWOperation.read
317        
318        return self
319
320    async def __aexit__(self, type, value, traceback):
321        if RWOperation.write == self.current_context_operation:
322            self.core.decrease_writers_in_progress()
323        else:
324            self.core.decrease_readers_in_progress()
325        
326        self.current_context_operation = None
327
328
329class UnknownLockerEntity(Exception):
330    pass
331
332
333class RWLocker(Service, EntityStatsMixin):
334    def __init__(self, loop: CoroSchedulerType):
335        super(RWLocker, self).__init__(loop)
336
337        # loop.add_global_on_coro_del_handler(self._on_coro_del_handler_global)  # Todo: switch to local coro del handler
338
339        self._request_workers = {
340            0: self._on_register,
341            1: self._on_deregister,
342            2: self._on_wait_for_write,
343            3: self._on_wait_for_read,
344        }
345        
346        self.locker_entities: Dict[Hashable, RWLockerEntity] = dict()
347        self.entities_by_coroutine: Dict[CoroID, Set[Hashable]] = dict()
348        
349        self.remove_entity_requests: Dict[CoroID, Hashable] = dict()
350        self.waiting_for_write_requests: Dict[CoroID, Hashable] = dict()
351        self.waiting_for_read_requests: Dict[CoroID, Hashable] = dict()
352
353    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
354        return type(self).__name__, {
355            'locker entities num': len(self.locker_entities),
356            'affected coroutines num': len(self.entities_by_coroutine),
357            'waiting for write requests num': len(self.waiting_for_write_requests),
358            'waiting_for_read_requests num': len(self.waiting_for_read_requests),
359        }
360
361    def single_task_registration_or_immediate_processing(self, request: Optional[RWLockerRequest]=None
362                                                         ) -> ServiceProcessingResponse:
363        if request is not None:
364            return self.resolve_request(request)
365        return True, None, None
366
367    def full_processing_iteration(self):
368        # entities_waiting_for_remove
369        processed_coro_ids: Set[CoroID] = set()
370        for coro_id, entity_id in self.remove_entity_requests.items():
371            if entity_id not in self.locker_entities:
372                self.register_response(coro_id, False, None)
373                processed_coro_ids.add(coro_id)
374                continue
375            
376            entity = self.locker_entities[entity_id]
377            need_to_try_later = entity.test_remove()
378            if need_to_try_later:
379                continue
380            else:
381                del self.locker_entities[entity_id]
382                processed_coro_ids.add(coro_id)
383        
384        for coro_id in processed_coro_ids:
385            del self.remove_entity_requests[coro_id]
386        
387        # entities_waiting_for_write
388        processed_coro_ids: Set[CoroID] = set()
389        for coro_id, entity_id in self.waiting_for_write_requests.items():
390            if entity_id not in self.locker_entities:
391                self.register_response(coro_id, None, UnknownLockerEntity)
392                processed_coro_ids.add(coro_id)
393                continue
394            
395            entity = self.locker_entities[entity_id]
396            need_to_try_later = entity.try_write_lock(coro_id)
397            if need_to_try_later:
398                continue
399            else:
400                entity.writers_pending -= 1
401                if coro_id in entity.waiting_coroutines:
402                    entity.waiting_coroutines.remove(coro_id)
403                
404                self.register_response(coro_id, None, None)
405                processed_coro_ids.add(coro_id)
406        
407        for coro_id in processed_coro_ids:
408            del self.waiting_for_write_requests[coro_id]
409                    
410        # entities_waiting_for_read
411        processed_coro_ids: Set[CoroID] = set()
412        for coro_id, entity_id in self.waiting_for_read_requests.items():
413            if entity_id not in self.locker_entities:
414                self.register_response(coro_id, None, UnknownLockerEntity)
415                processed_coro_ids.add(coro_id)
416                continue
417            
418            entity = self.locker_entities[entity_id]
419            need_to_try_later = entity.try_read_lock(coro_id)
420            if need_to_try_later:
421                continue
422            else:
423                entity.readers_pending -= 1
424                if coro_id in entity.waiting_coroutines:
425                    entity.waiting_coroutines.remove(coro_id)
426                
427                self.register_response(coro_id, None, None)
428                processed_coro_ids.add(coro_id)
429        
430        for coro_id in processed_coro_ids:
431            del self.waiting_for_read_requests[coro_id]
432        
433        # general
434        if not (self.remove_entity_requests or self.waiting_for_write_requests or self.waiting_for_read_requests):
435            self.make_dead()
436
437    def in_work(self) -> bool:
438        result: bool = bool(self.remove_entity_requests) or bool(self.waiting_for_write_requests) or bool(self.waiting_for_read_requests)
439        return self.thrifty_in_work(result)
440    
441    def get_locker_entity(self, entity_id: Hashable):
442        return self.locker_entities.get(entity_id)
443
444    def _on_register(self, entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> ServiceProcessingResponse:
445        if entity_id not in self.locker_entities:
446            self.locker_entities[entity_id] = RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, type(self))
447        entity: RWLockerEntity = self.locker_entities[entity_id]
448        
449        coro_id = self.current_caller_coro_info.coro.coro_id
450        entity.related_coroutines.add(coro_id)
451        if coro_id not in self.entities_by_coroutine:
452            self.entities_by_coroutine[coro_id] = set()
453        
454        self.entities_by_coroutine[coro_id].add(entity_id)
455        self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler)
456        context_manager: RWLockerContextManager = RWLockerContextManager(entity, self.current_caller_coro_info.coro.interface)
457        return True, context_manager, None
458
459    def _on_deregister(self, entity_id: Hashable, safe: bool = True) -> ServiceProcessingResponse:
460        result = None
461        if safe:
462            self.remove_entity_requests[self.current_caller_coro_info.coro.coro_id] = entity_id
463            self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler)
464            self.make_live()
465            return False, None, None
466        else:
467            if entity_id in self.locker_entities:
468                entity = self.locker_entities[entity_id]
469                del self.locker_entities[entity_id]
470                for related_coro_id in entity.related_coroutines:
471                    coroutine_entities = self.entities_by_coroutine[related_coro_id]
472                    if entity_id in coroutine_entities:
473                        coroutine_entities.remove(entity_id)
474                
475                result = True
476            else:
477                result = False
478            
479            return True, result, None
480
481    def _on_wait_for_write(self, entity_id: Hashable) -> ServiceProcessingResponse:
482        if entity_id not in self.locker_entities:
483            return True, None, UnknownLockerEntity()
484        
485        coro_id = self.current_caller_coro_info.coro.coro_id
486        self.waiting_for_write_requests[coro_id] = entity_id
487        self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler)
488        entity = self.locker_entities[entity_id]
489        entity.waiting_coroutines.add(coro_id)
490        self.make_live()
491        return False, None, None
492
493    def _on_wait_for_read(self, entity_id: Hashable) -> ServiceProcessingResponse:
494        if entity_id not in self.locker_entities:
495            return True, None, UnknownLockerEntity()
496        
497        coro_id = self.current_caller_coro_info.coro.coro_id
498        self.waiting_for_read_requests[coro_id] = entity_id
499        self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler)
500        entity = self.locker_entities[entity_id]
501        entity.waiting_coroutines.add(coro_id)
502        self.make_live()
503        return False, None, None
504
505    def _on_coro_del_handler_global(self, coro: CoroWrapperBase) -> bool:
506        coro_id = coro.coro_id
507        if coro_id in self.entities_by_coroutine:
508            entities = self.entities_by_coroutine[coro_id]
509            del self.entities_by_coroutine[coro_id]
510            for entity_id in entities:
511                if entity_id in self.locker_entities:
512                    entity = self.locker_entities[entity_id]
513                    if coro_id in entity.related_coroutines:
514                        entity.related_coroutines.remove(coro_id)
515                    
516                    if coro_id in entity.waiting_coroutines:
517                        entity.waiting_coroutines.remove(coro_id)
518        
519        if coro_id in self.remove_entity_requests:
520            del self.remove_entity_requests[coro_id]
521        
522        if coro_id in self.waiting_for_write_requests:
523            del self.waiting_for_write_requests[coro_id]
524        
525        if coro_id in self.waiting_for_read_requests:
526            del self.waiting_for_read_requests[coro_id]
527
528        return False
529
530    def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool:
531        return self._on_coro_del_handler_global(coro)
532
533
534RWLockerRequest.default_service_type = RWLocker
535
536
537def get_rw_lock(entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]:
538    loop = CoroScheduler.current_loop()
539    if loop is None:
540        return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker))  # running not from inside the loop
541
542    interface = loop.current_interface()
543    if interface is None:
544        return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker))  # running from Service
545
546    locker_entity = interface._loop.get_service_instance(RWLocker).get_locker_entity(entity_id)
547    if locker_entity is None:
548        lock = interface(RWLocker, RWLockerRequest().register(entity_id, max_writers_in_progress, max_readers_in_progress, recursive))
549    else:
550        lock = RWLockerContextManager(locker_entity, interface)
551    
552    return lock
553
554
555grwl = get_rw_lock
556
557
558async def aget_rw_lock(entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]:
559    loop = CoroScheduler.current_loop()
560    if loop is None:
561        return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker))  # running not from inside the loop
562
563    interface = loop.current_interface()
564    if interface is None:
565        return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker))  # running from Service
566
567    locker_entity = interface._loop.get_service_instance(RWLocker).get_locker_entity(entity_id)
568    if locker_entity is None:
569        lock = await interface(RWLocker, RWLockerRequest().register(entity_id, max_writers_in_progress, max_readers_in_progress, recursive))
570    else:
571        lock = RWLockerContextManager(locker_entity, interface)
572    
573    return lock
574
575
576agrwl = aget_rw_lock
class RWOperation(enum.Enum):
52class RWOperation(Enum):
53    read = 0
54    write = 1

An enumeration.

read = <RWOperation.read: 0>
write = <RWOperation.write: 1>
Inherited Members
enum.Enum
name
value
class RWLockerRequest(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest):
57class RWLockerRequest(ServiceRequest):
58    def register(self, entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> 'RWLockerContextManager':
59        return self._save(0, entity_id, max_writers_in_progress, max_readers_in_progress, recursive)
60
61    def deregister(self, entity_id: Hashable, safe: bool = True) -> bool:
62        return self._save(1, entity_id, safe)
63    
64    def wait_for_write(self, entity_id: Hashable) -> None:
65        return self._save(2, entity_id)
66    
67    def wait_for_read(self, entity_id: Hashable) -> None:
68        return self._save(3, entity_id)
def register( self, entity_id: typing.Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> RWLockerContextManager:
58    def register(self, entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> 'RWLockerContextManager':
59        return self._save(0, entity_id, max_writers_in_progress, max_readers_in_progress, recursive)
def deregister(self, entity_id: typing.Hashable, safe: bool = True) -> bool:
61    def deregister(self, entity_id: Hashable, safe: bool = True) -> bool:
62        return self._save(1, entity_id, safe)
def wait_for_write(self, entity_id: typing.Hashable) -> None:
64    def wait_for_write(self, entity_id: Hashable) -> None:
65        return self._save(2, entity_id)
def wait_for_read(self, entity_id: typing.Hashable) -> None:
67    def wait_for_read(self, entity_id: Hashable) -> None:
68        return self._save(3, entity_id)
default_service_type: Union[type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service], NoneType] = <class 'RWLocker'>
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ServiceRequest
default__request__type__
request_type
args
kwargs
provide_to_request_handler
interface
i
async_interface
ai
class RWLockerEntity:
 71class RWLockerEntity:
 72    def __init__(self, entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool, service: Type[Service]):
 73        self.entity_id: Hashable = entity_id
 74        self.recursive: bool = recursive
 75        self.service = service
 76        self.writers_pending: int = 0
 77        self.writers_in_progress_dict: Dict[CoroID, int] = dict()
 78        self.writers_in_progress: int = 0
 79        self.max_writers_in_progress: int = max_writers_in_progress  # Must be edited directly from coroutine. In order to eliminate new writers arrived during the end of the current loop iteration
 80        self.readers_pending: int = 0
 81        self.readers_in_progress_dict: Dict[CoroID, int] = dict()
 82        self.readers_in_progress: int = 0
 83        self.max_readers_in_progress: int = max_readers_in_progress  # Must be edited directly from coroutine. In order to eliminate new readers arrived during the end of the current loop iteration
 84        self.last_operation: RWOperation = RWOperation.read  # Default is 'RWOperation.read' in order to force 'write' as a first operation among several first concurent operations
 85        self.waiting_coroutines: Set[CoroID] = set()
 86        self.related_coroutines: Set[CoroID] = set()
 87
 88    def check_writers_in_progress_boundaries(self, coro_id: Optional[CoroID] = None) -> bool:
 89        if self.recursive:
 90            if 0 > self.max_writers_in_progress:
 91                return True
 92            else:
 93                if coro_id in self.writers_in_progress_dict:
 94                    return True
 95                else:
 96                    return self.writers_in_progress < self.max_writers_in_progress
 97        else:
 98            if 0 > self.max_writers_in_progress:
 99                return True
100            else:
101                return self.writers_in_progress < self.max_writers_in_progress
102    
103    def increase_writers_in_progress(self, coro_id: Optional[CoroID] = None):
104        if self.recursive:
105            if coro_id not in self.writers_in_progress_dict:
106                self.writers_in_progress_dict[coro_id] = 0
107                self.writers_in_progress += 1
108            
109            self.writers_in_progress_dict[coro_id] += 1
110        else:
111            self.writers_in_progress += 1
112    
113    def decrease_writers_in_progress(self, coro_id: Optional[CoroID] = None):
114        if self.recursive:
115            self.writers_in_progress_dict[coro_id] -= 1
116            if 0 >= self.writers_in_progress_dict[coro_id]:
117                del self.writers_in_progress_dict[coro_id]
118                self.writers_in_progress -= 1
119        else:
120            self.writers_in_progress -= 1
121
122    def check_readers_in_progress_boundaries(self, coro_id: Optional[CoroID] = None) -> bool:
123        if self.recursive:
124            if 0 > self.max_readers_in_progress:
125                return True
126            else:
127                if coro_id in self.readers_in_progress_dict:
128                    return True
129                else:
130                    return self.readers_in_progress < self.max_readers_in_progress
131        else:
132            if 0 > self.max_readers_in_progress:
133                return True
134            else:
135                return self.readers_in_progress < self.max_readers_in_progress
136    
137    def increase_readers_in_progress(self, coro_id: Optional[CoroID] = None):
138        if self.recursive:
139            if coro_id not in self.readers_in_progress_dict:
140                self.readers_in_progress_dict[coro_id] = 0
141                self.readers_in_progress += 1
142            
143            self.readers_in_progress_dict[coro_id] += 1
144        else:
145            self.readers_in_progress += 1
146    
147    def decrease_readers_in_progress(self, coro_id: Optional[CoroID] = None):
148        if self.recursive:
149            self.readers_in_progress_dict[coro_id] -= 1
150            if 0 >= self.readers_in_progress_dict[coro_id]:
151                del self.readers_in_progress_dict[coro_id]
152                self.readers_in_progress -= 1
153        else:
154            self.readers_in_progress -= 1
155
156    def try_write_lock(self, coro_id: Optional[CoroID] = None, apply: bool = True) -> bool:
157        need_to_try_later = False
158        if self.readers_in_progress or self.readers_pending:
159            if (not self.readers_in_progress) and self.readers_pending and (RWOperation.read == self.last_operation) and self.check_writers_in_progress_boundaries(coro_id):
160                if apply:
161                    self.increase_writers_in_progress(coro_id)
162                    self.last_operation = RWOperation.write
163            else:
164                need_to_try_later = True
165        else:
166            if self.check_writers_in_progress_boundaries(coro_id):
167                if apply:
168                    self.increase_writers_in_progress(coro_id)
169                    self.last_operation = RWOperation.write
170            else:
171                need_to_try_later = True
172        
173        return need_to_try_later
174
175    def try_read_lock(self, coro_id: Optional[CoroID] = None, apply: bool = True) -> bool:
176        need_to_try_later = False
177        if self.writers_in_progress or self.writers_pending:
178            if (not self.writers_in_progress) and self.writers_pending and (RWOperation.write == self.last_operation) and self.check_readers_in_progress_boundaries(coro_id):
179                if apply:
180                    self.increase_readers_in_progress(coro_id)
181                    self.last_operation = RWOperation.read
182            else:
183                need_to_try_later = True
184        else:
185            if self.check_readers_in_progress_boundaries(coro_id):
186                if apply:
187                    self.increase_readers_in_progress(coro_id)
188                    self.last_operation = RWOperation.read
189            else:
190                need_to_try_later = True
191        
192        return need_to_try_later
193    
194    def test_remove(self) -> bool:
195        need_to_try_later = self.waiting_coroutines
196        return need_to_try_later
RWLockerEntity( entity_id: typing.Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool, service: typing.Type[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service])
72    def __init__(self, entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool, service: Type[Service]):
73        self.entity_id: Hashable = entity_id
74        self.recursive: bool = recursive
75        self.service = service
76        self.writers_pending: int = 0
77        self.writers_in_progress_dict: Dict[CoroID, int] = dict()
78        self.writers_in_progress: int = 0
79        self.max_writers_in_progress: int = max_writers_in_progress  # Must be edited directly from coroutine. In order to eliminate new writers arrived during the end of the current loop iteration
80        self.readers_pending: int = 0
81        self.readers_in_progress_dict: Dict[CoroID, int] = dict()
82        self.readers_in_progress: int = 0
83        self.max_readers_in_progress: int = max_readers_in_progress  # Must be edited directly from coroutine. In order to eliminate new readers arrived during the end of the current loop iteration
84        self.last_operation: RWOperation = RWOperation.read  # Default is 'RWOperation.read' in order to force 'write' as a first operation among several first concurent operations
85        self.waiting_coroutines: Set[CoroID] = set()
86        self.related_coroutines: Set[CoroID] = set()
entity_id: Hashable
recursive: bool
service
writers_pending: int
writers_in_progress_dict: Dict[int, int]
writers_in_progress: int
max_writers_in_progress: int
readers_pending: int
readers_in_progress_dict: Dict[int, int]
readers_in_progress: int
max_readers_in_progress: int
last_operation: RWOperation
waiting_coroutines: Set[int]
related_coroutines: Set[int]
def check_writers_in_progress_boundaries(self, coro_id: typing.Union[int, NoneType] = None) -> bool:
 88    def check_writers_in_progress_boundaries(self, coro_id: Optional[CoroID] = None) -> bool:
 89        if self.recursive:
 90            if 0 > self.max_writers_in_progress:
 91                return True
 92            else:
 93                if coro_id in self.writers_in_progress_dict:
 94                    return True
 95                else:
 96                    return self.writers_in_progress < self.max_writers_in_progress
 97        else:
 98            if 0 > self.max_writers_in_progress:
 99                return True
100            else:
101                return self.writers_in_progress < self.max_writers_in_progress
def increase_writers_in_progress(self, coro_id: typing.Union[int, NoneType] = None):
103    def increase_writers_in_progress(self, coro_id: Optional[CoroID] = None):
104        if self.recursive:
105            if coro_id not in self.writers_in_progress_dict:
106                self.writers_in_progress_dict[coro_id] = 0
107                self.writers_in_progress += 1
108            
109            self.writers_in_progress_dict[coro_id] += 1
110        else:
111            self.writers_in_progress += 1
def decrease_writers_in_progress(self, coro_id: typing.Union[int, NoneType] = None):
113    def decrease_writers_in_progress(self, coro_id: Optional[CoroID] = None):
114        if self.recursive:
115            self.writers_in_progress_dict[coro_id] -= 1
116            if 0 >= self.writers_in_progress_dict[coro_id]:
117                del self.writers_in_progress_dict[coro_id]
118                self.writers_in_progress -= 1
119        else:
120            self.writers_in_progress -= 1
def check_readers_in_progress_boundaries(self, coro_id: typing.Union[int, NoneType] = None) -> bool:
122    def check_readers_in_progress_boundaries(self, coro_id: Optional[CoroID] = None) -> bool:
123        if self.recursive:
124            if 0 > self.max_readers_in_progress:
125                return True
126            else:
127                if coro_id in self.readers_in_progress_dict:
128                    return True
129                else:
130                    return self.readers_in_progress < self.max_readers_in_progress
131        else:
132            if 0 > self.max_readers_in_progress:
133                return True
134            else:
135                return self.readers_in_progress < self.max_readers_in_progress
def increase_readers_in_progress(self, coro_id: typing.Union[int, NoneType] = None):
137    def increase_readers_in_progress(self, coro_id: Optional[CoroID] = None):
138        if self.recursive:
139            if coro_id not in self.readers_in_progress_dict:
140                self.readers_in_progress_dict[coro_id] = 0
141                self.readers_in_progress += 1
142            
143            self.readers_in_progress_dict[coro_id] += 1
144        else:
145            self.readers_in_progress += 1
def decrease_readers_in_progress(self, coro_id: typing.Union[int, NoneType] = None):
147    def decrease_readers_in_progress(self, coro_id: Optional[CoroID] = None):
148        if self.recursive:
149            self.readers_in_progress_dict[coro_id] -= 1
150            if 0 >= self.readers_in_progress_dict[coro_id]:
151                del self.readers_in_progress_dict[coro_id]
152                self.readers_in_progress -= 1
153        else:
154            self.readers_in_progress -= 1
def try_write_lock( self, coro_id: typing.Union[int, NoneType] = None, apply: bool = True) -> bool:
156    def try_write_lock(self, coro_id: Optional[CoroID] = None, apply: bool = True) -> bool:
157        need_to_try_later = False
158        if self.readers_in_progress or self.readers_pending:
159            if (not self.readers_in_progress) and self.readers_pending and (RWOperation.read == self.last_operation) and self.check_writers_in_progress_boundaries(coro_id):
160                if apply:
161                    self.increase_writers_in_progress(coro_id)
162                    self.last_operation = RWOperation.write
163            else:
164                need_to_try_later = True
165        else:
166            if self.check_writers_in_progress_boundaries(coro_id):
167                if apply:
168                    self.increase_writers_in_progress(coro_id)
169                    self.last_operation = RWOperation.write
170            else:
171                need_to_try_later = True
172        
173        return need_to_try_later
def try_read_lock( self, coro_id: typing.Union[int, NoneType] = None, apply: bool = True) -> bool:
175    def try_read_lock(self, coro_id: Optional[CoroID] = None, apply: bool = True) -> bool:
176        need_to_try_later = False
177        if self.writers_in_progress or self.writers_pending:
178            if (not self.writers_in_progress) and self.writers_pending and (RWOperation.write == self.last_operation) and self.check_readers_in_progress_boundaries(coro_id):
179                if apply:
180                    self.increase_readers_in_progress(coro_id)
181                    self.last_operation = RWOperation.read
182            else:
183                need_to_try_later = True
184        else:
185            if self.check_readers_in_progress_boundaries(coro_id):
186                if apply:
187                    self.increase_readers_in_progress(coro_id)
188                    self.last_operation = RWOperation.read
189            else:
190                need_to_try_later = True
191        
192        return need_to_try_later
def test_remove(self) -> bool:
194    def test_remove(self) -> bool:
195        need_to_try_later = self.waiting_coroutines
196        return need_to_try_later
class RWLockerContextManager(RWLockerContextManagerBase):
227class RWLockerContextManager(RWLockerContextManagerBase):
228    def __init__(self, core: RWLockerEntity, interface: Interface):
229        super().__init__(core)
230        self._interface = interface
231    
232    def __enter__(self):
233        if self.current_context_operation is None:
234            self.current_context_operation = RWOperation.read
235        
236        if RWOperation.write == self.current_context_operation:
237            need_service_assistance = self.core.try_write_lock(self._interface.coro_id)
238            if need_service_assistance:
239                self.core.writers_pending += 1
240                self._interface(self.core.service, RWLockerRequest().wait_for_write(self.core.entity_id))
241        else:
242            need_service_assistance = self.core.try_read_lock(self._interface.coro_id)
243            if need_service_assistance:
244                self.core.readers_pending += 1
245                self._interface(self.core.service, RWLockerRequest().wait_for_read(self.core.entity_id))
246        
247        return self
248    
249    def __exit__(self, type, value: Exception, traceback):
250        if RWOperation.write == self.current_context_operation:
251            self.core.decrease_writers_in_progress(self._interface)
252        else:
253            self.core.decrease_readers_in_progress(self._interface)
254        
255        self.current_context_operation = None
256
257    async def __aenter__(self):
258        if self.current_context_operation is None:
259            self.current_context_operation = RWOperation.read
260        
261        if RWOperation.write == self.current_context_operation:
262            need_service_assistance = self.core.try_write_lock(self._interface.coro_id)
263            if need_service_assistance:
264                self.core.writers_pending += 1
265                await self._interface(self.core.service, RWLockerRequest().wait_for_write(self.core.entity_id))
266        else:
267            need_service_assistance = self.core.try_read_lock(self._interface.coro_id)
268            if need_service_assistance:
269                self.core.readers_pending += 1
270                await self._interface(self.core.service, RWLockerRequest().wait_for_read(self.core.entity_id))        
271        
272        return self
273
274    async def __aexit__(self, type, value, traceback):
275        if RWOperation.write == self.current_context_operation:
276            self.core.decrease_writers_in_progress(self._interface)
277        else:
278            self.core.decrease_readers_in_progress(self._interface)
279        
280        self.current_context_operation = None
RWLockerContextManager( core: RWLockerEntity, interface: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface)
228    def __init__(self, core: RWLockerEntity, interface: Interface):
229        super().__init__(core)
230        self._interface = interface
class FakeRWLockerContextManager(RWLockerContextManagerBase):
283class FakeRWLockerContextManager(RWLockerContextManagerBase):
284    def __init__(self, core: RWLockerEntity):
285        super().__init__(core)
286    
287    def __enter__(self):
288        if self.current_context_operation is None:
289            self.current_context_operation = RWOperation.read
290        
291        if RWOperation.write == self.current_context_operation:
292            self.core.increase_writers_in_progress()
293            self.core.last_operation = RWOperation.write
294        else:
295            self.core.increase_readers_in_progress()
296            self.core.last_operation = RWOperation.read
297        
298        return self
299    
300    def __exit__(self, type, value: Exception, traceback):
301        if RWOperation.write == self.current_context_operation:
302            self.core.decrease_writers_in_progress()
303        else:
304            self.core.decrease_readers_in_progress()
305        
306        self.current_context_operation = None
307
308    async def __aenter__(self):
309        if self.current_context_operation is None:
310            self.current_context_operation = RWOperation.read
311        
312        if RWOperation.write == self.current_context_operation:
313            self.core.increase_writers_in_progress()
314            self.core.last_operation = RWOperation.write
315        else:
316            self.core.increase_readers_in_progress()
317            self.core.last_operation = RWOperation.read
318        
319        return self
320
321    async def __aexit__(self, type, value, traceback):
322        if RWOperation.write == self.current_context_operation:
323            self.core.decrease_writers_in_progress()
324        else:
325            self.core.decrease_readers_in_progress()
326        
327        self.current_context_operation = None
FakeRWLockerContextManager( core: RWLockerEntity)
284    def __init__(self, core: RWLockerEntity):
285        super().__init__(core)
class UnknownLockerEntity(builtins.Exception):
330class UnknownLockerEntity(Exception):
331    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class RWLocker(cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin):
334class RWLocker(Service, EntityStatsMixin):
335    def __init__(self, loop: CoroSchedulerType):
336        super(RWLocker, self).__init__(loop)
337
338        # loop.add_global_on_coro_del_handler(self._on_coro_del_handler_global)  # Todo: switch to local coro del handler
339
340        self._request_workers = {
341            0: self._on_register,
342            1: self._on_deregister,
343            2: self._on_wait_for_write,
344            3: self._on_wait_for_read,
345        }
346        
347        self.locker_entities: Dict[Hashable, RWLockerEntity] = dict()
348        self.entities_by_coroutine: Dict[CoroID, Set[Hashable]] = dict()
349        
350        self.remove_entity_requests: Dict[CoroID, Hashable] = dict()
351        self.waiting_for_write_requests: Dict[CoroID, Hashable] = dict()
352        self.waiting_for_read_requests: Dict[CoroID, Hashable] = dict()
353
354    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
355        return type(self).__name__, {
356            'locker entities num': len(self.locker_entities),
357            'affected coroutines num': len(self.entities_by_coroutine),
358            'waiting for write requests num': len(self.waiting_for_write_requests),
359            'waiting_for_read_requests num': len(self.waiting_for_read_requests),
360        }
361
362    def single_task_registration_or_immediate_processing(self, request: Optional[RWLockerRequest]=None
363                                                         ) -> ServiceProcessingResponse:
364        if request is not None:
365            return self.resolve_request(request)
366        return True, None, None
367
368    def full_processing_iteration(self):
369        # entities_waiting_for_remove
370        processed_coro_ids: Set[CoroID] = set()
371        for coro_id, entity_id in self.remove_entity_requests.items():
372            if entity_id not in self.locker_entities:
373                self.register_response(coro_id, False, None)
374                processed_coro_ids.add(coro_id)
375                continue
376            
377            entity = self.locker_entities[entity_id]
378            need_to_try_later = entity.test_remove()
379            if need_to_try_later:
380                continue
381            else:
382                del self.locker_entities[entity_id]
383                processed_coro_ids.add(coro_id)
384        
385        for coro_id in processed_coro_ids:
386            del self.remove_entity_requests[coro_id]
387        
388        # entities_waiting_for_write
389        processed_coro_ids: Set[CoroID] = set()
390        for coro_id, entity_id in self.waiting_for_write_requests.items():
391            if entity_id not in self.locker_entities:
392                self.register_response(coro_id, None, UnknownLockerEntity)
393                processed_coro_ids.add(coro_id)
394                continue
395            
396            entity = self.locker_entities[entity_id]
397            need_to_try_later = entity.try_write_lock(coro_id)
398            if need_to_try_later:
399                continue
400            else:
401                entity.writers_pending -= 1
402                if coro_id in entity.waiting_coroutines:
403                    entity.waiting_coroutines.remove(coro_id)
404                
405                self.register_response(coro_id, None, None)
406                processed_coro_ids.add(coro_id)
407        
408        for coro_id in processed_coro_ids:
409            del self.waiting_for_write_requests[coro_id]
410                    
411        # entities_waiting_for_read
412        processed_coro_ids: Set[CoroID] = set()
413        for coro_id, entity_id in self.waiting_for_read_requests.items():
414            if entity_id not in self.locker_entities:
415                self.register_response(coro_id, None, UnknownLockerEntity)
416                processed_coro_ids.add(coro_id)
417                continue
418            
419            entity = self.locker_entities[entity_id]
420            need_to_try_later = entity.try_read_lock(coro_id)
421            if need_to_try_later:
422                continue
423            else:
424                entity.readers_pending -= 1
425                if coro_id in entity.waiting_coroutines:
426                    entity.waiting_coroutines.remove(coro_id)
427                
428                self.register_response(coro_id, None, None)
429                processed_coro_ids.add(coro_id)
430        
431        for coro_id in processed_coro_ids:
432            del self.waiting_for_read_requests[coro_id]
433        
434        # general
435        if not (self.remove_entity_requests or self.waiting_for_write_requests or self.waiting_for_read_requests):
436            self.make_dead()
437
438    def in_work(self) -> bool:
439        result: bool = bool(self.remove_entity_requests) or bool(self.waiting_for_write_requests) or bool(self.waiting_for_read_requests)
440        return self.thrifty_in_work(result)
441    
442    def get_locker_entity(self, entity_id: Hashable):
443        return self.locker_entities.get(entity_id)
444
445    def _on_register(self, entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> ServiceProcessingResponse:
446        if entity_id not in self.locker_entities:
447            self.locker_entities[entity_id] = RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, type(self))
448        entity: RWLockerEntity = self.locker_entities[entity_id]
449        
450        coro_id = self.current_caller_coro_info.coro.coro_id
451        entity.related_coroutines.add(coro_id)
452        if coro_id not in self.entities_by_coroutine:
453            self.entities_by_coroutine[coro_id] = set()
454        
455        self.entities_by_coroutine[coro_id].add(entity_id)
456        self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler)
457        context_manager: RWLockerContextManager = RWLockerContextManager(entity, self.current_caller_coro_info.coro.interface)
458        return True, context_manager, None
459
460    def _on_deregister(self, entity_id: Hashable, safe: bool = True) -> ServiceProcessingResponse:
461        result = None
462        if safe:
463            self.remove_entity_requests[self.current_caller_coro_info.coro.coro_id] = entity_id
464            self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler)
465            self.make_live()
466            return False, None, None
467        else:
468            if entity_id in self.locker_entities:
469                entity = self.locker_entities[entity_id]
470                del self.locker_entities[entity_id]
471                for related_coro_id in entity.related_coroutines:
472                    coroutine_entities = self.entities_by_coroutine[related_coro_id]
473                    if entity_id in coroutine_entities:
474                        coroutine_entities.remove(entity_id)
475                
476                result = True
477            else:
478                result = False
479            
480            return True, result, None
481
482    def _on_wait_for_write(self, entity_id: Hashable) -> ServiceProcessingResponse:
483        if entity_id not in self.locker_entities:
484            return True, None, UnknownLockerEntity()
485        
486        coro_id = self.current_caller_coro_info.coro.coro_id
487        self.waiting_for_write_requests[coro_id] = entity_id
488        self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler)
489        entity = self.locker_entities[entity_id]
490        entity.waiting_coroutines.add(coro_id)
491        self.make_live()
492        return False, None, None
493
494    def _on_wait_for_read(self, entity_id: Hashable) -> ServiceProcessingResponse:
495        if entity_id not in self.locker_entities:
496            return True, None, UnknownLockerEntity()
497        
498        coro_id = self.current_caller_coro_info.coro.coro_id
499        self.waiting_for_read_requests[coro_id] = entity_id
500        self.current_caller_coro_info.coro.add_on_coro_del_handler(self._on_coro_del_handler)
501        entity = self.locker_entities[entity_id]
502        entity.waiting_coroutines.add(coro_id)
503        self.make_live()
504        return False, None, None
505
506    def _on_coro_del_handler_global(self, coro: CoroWrapperBase) -> bool:
507        coro_id = coro.coro_id
508        if coro_id in self.entities_by_coroutine:
509            entities = self.entities_by_coroutine[coro_id]
510            del self.entities_by_coroutine[coro_id]
511            for entity_id in entities:
512                if entity_id in self.locker_entities:
513                    entity = self.locker_entities[entity_id]
514                    if coro_id in entity.related_coroutines:
515                        entity.related_coroutines.remove(coro_id)
516                    
517                    if coro_id in entity.waiting_coroutines:
518                        entity.waiting_coroutines.remove(coro_id)
519        
520        if coro_id in self.remove_entity_requests:
521            del self.remove_entity_requests[coro_id]
522        
523        if coro_id in self.waiting_for_write_requests:
524            del self.waiting_for_write_requests[coro_id]
525        
526        if coro_id in self.waiting_for_read_requests:
527            del self.waiting_for_read_requests[coro_id]
528
529        return False
530
531    def _on_coro_del_handler(self, coro: CoroWrapperBase) -> bool:
532        return self._on_coro_del_handler_global(coro)
RWLocker( loop: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerGreenlet, cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.CoroSchedulerAwaitable])
335    def __init__(self, loop: CoroSchedulerType):
336        super(RWLocker, self).__init__(loop)
337
338        # loop.add_global_on_coro_del_handler(self._on_coro_del_handler_global)  # Todo: switch to local coro del handler
339
340        self._request_workers = {
341            0: self._on_register,
342            1: self._on_deregister,
343            2: self._on_wait_for_write,
344            3: self._on_wait_for_read,
345        }
346        
347        self.locker_entities: Dict[Hashable, RWLockerEntity] = dict()
348        self.entities_by_coroutine: Dict[CoroID, Set[Hashable]] = dict()
349        
350        self.remove_entity_requests: Dict[CoroID, Hashable] = dict()
351        self.waiting_for_write_requests: Dict[CoroID, Hashable] = dict()
352        self.waiting_for_read_requests: Dict[CoroID, Hashable] = dict()
locker_entities: Dict[Hashable, RWLockerEntity]
entities_by_coroutine: Dict[int, Set[Hashable]]
remove_entity_requests: Dict[int, Hashable]
waiting_for_write_requests: Dict[int, Hashable]
waiting_for_read_requests: Dict[int, Hashable]
def get_entity_stats( self, stats_level: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin.StatsLevel = <StatsLevel.debug: 1>) -> Tuple[str, Dict[str, Any]]:
354    def get_entity_stats(self, stats_level: 'EntityStatsMixin.StatsLevel' = EntityStatsMixin.StatsLevel.debug) -> Tuple[str, Dict[str, Any]]:
355        return type(self).__name__, {
356            'locker entities num': len(self.locker_entities),
357            'affected coroutines num': len(self.entities_by_coroutine),
358            'waiting for write requests num': len(self.waiting_for_write_requests),
359            'waiting_for_read_requests num': len(self.waiting_for_read_requests),
360        }
def single_task_registration_or_immediate_processing( self, request: typing.Union[RWLockerRequest, NoneType] = None) -> Tuple[bool, Any, Union[BaseException, NoneType]]:
362    def single_task_registration_or_immediate_processing(self, request: Optional[RWLockerRequest]=None
363                                                         ) -> ServiceProcessingResponse:
364        if request is not None:
365            return self.resolve_request(request)
366        return True, None, None
def full_processing_iteration(self):
368    def full_processing_iteration(self):
369        # entities_waiting_for_remove
370        processed_coro_ids: Set[CoroID] = set()
371        for coro_id, entity_id in self.remove_entity_requests.items():
372            if entity_id not in self.locker_entities:
373                self.register_response(coro_id, False, None)
374                processed_coro_ids.add(coro_id)
375                continue
376            
377            entity = self.locker_entities[entity_id]
378            need_to_try_later = entity.test_remove()
379            if need_to_try_later:
380                continue
381            else:
382                del self.locker_entities[entity_id]
383                processed_coro_ids.add(coro_id)
384        
385        for coro_id in processed_coro_ids:
386            del self.remove_entity_requests[coro_id]
387        
388        # entities_waiting_for_write
389        processed_coro_ids: Set[CoroID] = set()
390        for coro_id, entity_id in self.waiting_for_write_requests.items():
391            if entity_id not in self.locker_entities:
392                self.register_response(coro_id, None, UnknownLockerEntity)
393                processed_coro_ids.add(coro_id)
394                continue
395            
396            entity = self.locker_entities[entity_id]
397            need_to_try_later = entity.try_write_lock(coro_id)
398            if need_to_try_later:
399                continue
400            else:
401                entity.writers_pending -= 1
402                if coro_id in entity.waiting_coroutines:
403                    entity.waiting_coroutines.remove(coro_id)
404                
405                self.register_response(coro_id, None, None)
406                processed_coro_ids.add(coro_id)
407        
408        for coro_id in processed_coro_ids:
409            del self.waiting_for_write_requests[coro_id]
410                    
411        # entities_waiting_for_read
412        processed_coro_ids: Set[CoroID] = set()
413        for coro_id, entity_id in self.waiting_for_read_requests.items():
414            if entity_id not in self.locker_entities:
415                self.register_response(coro_id, None, UnknownLockerEntity)
416                processed_coro_ids.add(coro_id)
417                continue
418            
419            entity = self.locker_entities[entity_id]
420            need_to_try_later = entity.try_read_lock(coro_id)
421            if need_to_try_later:
422                continue
423            else:
424                entity.readers_pending -= 1
425                if coro_id in entity.waiting_coroutines:
426                    entity.waiting_coroutines.remove(coro_id)
427                
428                self.register_response(coro_id, None, None)
429                processed_coro_ids.add(coro_id)
430        
431        for coro_id in processed_coro_ids:
432            del self.waiting_for_read_requests[coro_id]
433        
434        # general
435        if not (self.remove_entity_requests or self.waiting_for_write_requests or self.waiting_for_read_requests):
436            self.make_dead()
def in_work(self) -> bool:
438    def in_work(self) -> bool:
439        result: bool = bool(self.remove_entity_requests) or bool(self.waiting_for_write_requests) or bool(self.waiting_for_read_requests)
440        return self.thrifty_in_work(result)

Will be executed twice per iteration: once before and once after the full_processing_iteration() execution

Raises: NotImplementedError: _description_

Returns: bool: _description_

def get_locker_entity(self, entity_id: typing.Hashable):
442    def get_locker_entity(self, entity_id: Hashable):
443        return self.locker_entities.get(entity_id)
Inherited Members
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Service
current_caller_coro_info
iteration
make_response
register_response
put_task
resolve_request
try_resolve_request
in_forground_work
thrifty_in_work
time_left_before_next_event
is_low_latency
make_live
make_dead
service_id_impl
service_id
destroy
cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.EntityStatsMixin
StatsLevel
def get_rw_lock( entity_id: typing.Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]:
538def get_rw_lock(entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]:
539    loop = CoroScheduler.current_loop()
540    if loop is None:
541        return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker))  # running not from inside the loop
542
543    interface = loop.current_interface()
544    if interface is None:
545        return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker))  # running from Service
546
547    locker_entity = interface._loop.get_service_instance(RWLocker).get_locker_entity(entity_id)
548    if locker_entity is None:
549        lock = interface(RWLocker, RWLockerRequest().register(entity_id, max_writers_in_progress, max_readers_in_progress, recursive))
550    else:
551        lock = RWLockerContextManager(locker_entity, interface)
552    
553    return lock
def grwl( entity_id: typing.Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]:
538def get_rw_lock(entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]:
539    loop = CoroScheduler.current_loop()
540    if loop is None:
541        return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker))  # running not from inside the loop
542
543    interface = loop.current_interface()
544    if interface is None:
545        return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker))  # running from Service
546
547    locker_entity = interface._loop.get_service_instance(RWLocker).get_locker_entity(entity_id)
548    if locker_entity is None:
549        lock = interface(RWLocker, RWLockerRequest().register(entity_id, max_writers_in_progress, max_readers_in_progress, recursive))
550    else:
551        lock = RWLockerContextManager(locker_entity, interface)
552    
553    return lock
async def aget_rw_lock( entity_id: typing.Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]:
559async def aget_rw_lock(entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]:
560    loop = CoroScheduler.current_loop()
561    if loop is None:
562        return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker))  # running not from inside the loop
563
564    interface = loop.current_interface()
565    if interface is None:
566        return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker))  # running from Service
567
568    locker_entity = interface._loop.get_service_instance(RWLocker).get_locker_entity(entity_id)
569    if locker_entity is None:
570        lock = await interface(RWLocker, RWLockerRequest().register(entity_id, max_writers_in_progress, max_readers_in_progress, recursive))
571    else:
572        lock = RWLockerContextManager(locker_entity, interface)
573    
574    return lock
async def agrwl( entity_id: typing.Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]:
559async def aget_rw_lock(entity_id: Hashable, max_writers_in_progress: int, max_readers_in_progress: int, recursive: bool = True) -> Union[RWLockerContextManager, FakeRWLockerContextManager]:
560    loop = CoroScheduler.current_loop()
561    if loop is None:
562        return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker))  # running not from inside the loop
563
564    interface = loop.current_interface()
565    if interface is None:
566        return FakeRWLockerContextManager(RWLockerEntity(entity_id, max_writers_in_progress, max_readers_in_progress, recursive, RWLocker))  # running from Service
567
568    locker_entity = interface._loop.get_service_instance(RWLocker).get_locker_entity(entity_id)
569    if locker_entity is None:
570        lock = await interface(RWLocker, RWLockerRequest().register(entity_id, max_writers_in_progress, max_readers_in_progress, recursive))
571    else:
572        lock = RWLockerContextManager(locker_entity, interface)
573    
574    return lock