cengal.parallel_execution.coroutines.coro_tools.lock.versions.v_0.lock
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19__all__ = ['Lock'] 20 21 22""" 23Module Docstring 24Docstrings: http://www.python.org/dev/peps/pep-0257/ 25""" 26 27__author__ = "ButenkoMS <gtalk@butenkoms.space>" 28__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 29__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 30__license__ = "Apache License, Version 2.0" 31__version__ = "4.4.1" 32__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 33__email__ = "gtalk@butenkoms.space" 34# __status__ = "Prototype" 35__status__ = "Development" 36# __status__ = "Production" 37 38 39from cengal.parallel_execution.coroutines.coro_scheduler import * 40from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro 41from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import RunCoro 42from cengal.parallel_execution.coroutines.coro_standard_services.async_event_bus import AsyncEventBus, AsyncEventBusRequest, EventID 43from cengal.parallel_execution.coroutines.coro_standard_services.wait_coro import WaitCoro, WaitCoroRequest, PutSingleCoroParams, CoroutineNotFoundError, TimeoutError 44from cengal.time_management.cpu_clock_cycles import perf_counter 45from cengal.math.numbers import RationalNumber 46from typing import Optional, Union 47 48 49async def wait_for_event(i: Interface, event: EventID): 50 await i(AsyncEventBus, AsyncEventBusRequest().wait(event)) 51 return True 52 53 54class Lock: 55 def __init__(self, event: EventID, timeout: Optional[RationalNumber] = None, parent: Optional['Lock'] = None) -> None: 56 self._locked: bool = False 57 self._event: EventID = event 58 self.timeout: Optional[RationalNumber] = timeout 59 self.parent: Optional['Lock'] = parent 60 self.last_lock_attempt_successful: Union[None, bool] = None 61 62 @property 63 def locked(self): 64 if self.parent is None: 65 return self._locked 66 else: 67 return self.parent.locked 68 69 @locked.setter 70 def locked(self, value: bool): 71 if self.parent is None: 72 self._locked = value 73 else: 74 self.parent.locked = value 75 76 @property 77 def event(self): 78 if self.parent is None: 79 return self._event 80 else: 81 return self.parent.event 82 83 @event.setter 84 def event(self, value: EventID): 85 if self.parent is None: 86 self._event = value 87 else: 88 self.parent.event = value 89 90 def lock(self, timeout: RationalNumber): 91 return Lock(self.event, timeout, self) 92 93 def try_lock(self): 94 return Lock(self.event, 0, self) 95 96 def __enter__(self): 97 if self.locked: 98 if 0 == self.timeout: 99 self.last_lock_attempt_successful = False 100 return False 101 elif self.timeout is None: 102 i: Interface = current_interface() 103 i(RunCoro, wait_for_event, self.event) 104 self.locked = True 105 self.last_lock_attempt_successful = True 106 return True 107 else: 108 i = current_interface() 109 try: 110 need_to_repeat = True 111 time_spend = 0 112 while need_to_repeat and ((self.timeout is None) or (time_spend < self.timeout)): 113 start_time: float = perf_counter() 114 wait_for_event_coro_id: CoroID = i(PutCoro, wait_for_event, self.event) 115 try: 116 i(WaitCoro, WaitCoroRequest(self.timeout - time_spend, kill_on_timeout=True, result_required=True).single(wait_for_event_coro_id)) 117 except CoroutineNotFoundError: 118 pass 119 120 time_spend += perf_counter() - start_time 121 if not self.locked: 122 self.locked = True 123 self.last_lock_attempt_successful = True 124 need_to_repeat = False 125 return True 126 except TimeoutError: 127 pass 128 129 return False 130 else: 131 self.locked = True 132 self.last_lock_attempt_successful = True 133 return True 134 135 def __exit__(self, type, value: Exception, traceback): 136 self.locked = False 137 i: Interface = current_interface() 138 i(AsyncEventBus, AsyncEventBusRequest().send_event(self.event, None)) 139 140 async def __aenter__(self): 141 if self.locked: 142 if 0 == self.timeout: 143 self.last_lock_attempt_successful = False 144 return False 145 elif self.timeout is None: 146 i: Interface = current_interface() 147 await i(RunCoro, wait_for_event, self.event) 148 self.locked = True 149 self.last_lock_attempt_successful = True 150 return True 151 else: 152 i = current_interface() 153 try: 154 need_to_repeat = True 155 time_spend = 0 156 while need_to_repeat and ((self.timeout is None) or (time_spend < self.timeout)): 157 start_time: float = perf_counter() 158 wait_for_event_coro_id: CoroID = await i(PutCoro, wait_for_event, self.event) 159 try: 160 await i(WaitCoro, WaitCoroRequest(self.timeout - time_spend, kill_on_timeout=True, result_required=True).single(wait_for_event_coro_id)) 161 except CoroutineNotFoundError: 162 pass 163 164 time_spend += perf_counter() - start_time 165 if not self.locked: 166 self.locked = True 167 self.last_lock_attempt_successful = True 168 need_to_repeat = False 169 return True 170 except TimeoutError: 171 pass 172 173 return False 174 else: 175 self.locked = True 176 self.last_lock_attempt_successful = True 177 return True 178 179 async def __aexit__(self, type, value, traceback): 180 self.locked = False 181 i: Interface = current_interface() 182 await i(AsyncEventBus, AsyncEventBusRequest().send_event(self.event, None))
class
Lock:
55class Lock: 56 def __init__(self, event: EventID, timeout: Optional[RationalNumber] = None, parent: Optional['Lock'] = None) -> None: 57 self._locked: bool = False 58 self._event: EventID = event 59 self.timeout: Optional[RationalNumber] = timeout 60 self.parent: Optional['Lock'] = parent 61 self.last_lock_attempt_successful: Union[None, bool] = None 62 63 @property 64 def locked(self): 65 if self.parent is None: 66 return self._locked 67 else: 68 return self.parent.locked 69 70 @locked.setter 71 def locked(self, value: bool): 72 if self.parent is None: 73 self._locked = value 74 else: 75 self.parent.locked = value 76 77 @property 78 def event(self): 79 if self.parent is None: 80 return self._event 81 else: 82 return self.parent.event 83 84 @event.setter 85 def event(self, value: EventID): 86 if self.parent is None: 87 self._event = value 88 else: 89 self.parent.event = value 90 91 def lock(self, timeout: RationalNumber): 92 return Lock(self.event, timeout, self) 93 94 def try_lock(self): 95 return Lock(self.event, 0, self) 96 97 def __enter__(self): 98 if self.locked: 99 if 0 == self.timeout: 100 self.last_lock_attempt_successful = False 101 return False 102 elif self.timeout is None: 103 i: Interface = current_interface() 104 i(RunCoro, wait_for_event, self.event) 105 self.locked = True 106 self.last_lock_attempt_successful = True 107 return True 108 else: 109 i = current_interface() 110 try: 111 need_to_repeat = True 112 time_spend = 0 113 while need_to_repeat and ((self.timeout is None) or (time_spend < self.timeout)): 114 start_time: float = perf_counter() 115 wait_for_event_coro_id: CoroID = i(PutCoro, wait_for_event, self.event) 116 try: 117 i(WaitCoro, WaitCoroRequest(self.timeout - time_spend, kill_on_timeout=True, result_required=True).single(wait_for_event_coro_id)) 118 except CoroutineNotFoundError: 119 pass 120 121 time_spend += perf_counter() - start_time 122 if not self.locked: 123 self.locked = True 124 self.last_lock_attempt_successful = True 125 need_to_repeat = False 126 return True 127 except TimeoutError: 128 pass 129 130 return False 131 else: 132 self.locked = True 133 self.last_lock_attempt_successful = True 134 return True 135 136 def __exit__(self, type, value: Exception, traceback): 137 self.locked = False 138 i: Interface = current_interface() 139 i(AsyncEventBus, AsyncEventBusRequest().send_event(self.event, None)) 140 141 async def __aenter__(self): 142 if self.locked: 143 if 0 == self.timeout: 144 self.last_lock_attempt_successful = False 145 return False 146 elif self.timeout is None: 147 i: Interface = current_interface() 148 await i(RunCoro, wait_for_event, self.event) 149 self.locked = True 150 self.last_lock_attempt_successful = True 151 return True 152 else: 153 i = current_interface() 154 try: 155 need_to_repeat = True 156 time_spend = 0 157 while need_to_repeat and ((self.timeout is None) or (time_spend < self.timeout)): 158 start_time: float = perf_counter() 159 wait_for_event_coro_id: CoroID = await i(PutCoro, wait_for_event, self.event) 160 try: 161 await i(WaitCoro, WaitCoroRequest(self.timeout - time_spend, kill_on_timeout=True, result_required=True).single(wait_for_event_coro_id)) 162 except CoroutineNotFoundError: 163 pass 164 165 time_spend += perf_counter() - start_time 166 if not self.locked: 167 self.locked = True 168 self.last_lock_attempt_successful = True 169 need_to_repeat = False 170 return True 171 except TimeoutError: 172 pass 173 174 return False 175 else: 176 self.locked = True 177 self.last_lock_attempt_successful = True 178 return True 179 180 async def __aexit__(self, type, value, traceback): 181 self.locked = False 182 i: Interface = current_interface() 183 await i(AsyncEventBus, AsyncEventBusRequest().send_event(self.event, None))
Lock( event: str, timeout: typing.Union[int, float, NoneType] = None, parent: typing.Union[Lock, NoneType] = None)
56 def __init__(self, event: EventID, timeout: Optional[RationalNumber] = None, parent: Optional['Lock'] = None) -> None: 57 self._locked: bool = False 58 self._event: EventID = event 59 self.timeout: Optional[RationalNumber] = timeout 60 self.parent: Optional['Lock'] = parent 61 self.last_lock_attempt_successful: Union[None, bool] = None
parent: Union[Lock, NoneType]