cengal.parallel_execution.coroutines.coro_tools.lock.versions.v_0.lock

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19__all__ = ['Lock']
 20
 21
 22"""
 23Module Docstring
 24Docstrings: http://www.python.org/dev/peps/pep-0257/
 25"""
 26
 27__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 28__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 29__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 30__license__ = "Apache License, Version 2.0"
 31__version__ = "4.4.1"
 32__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 33__email__ = "gtalk@butenkoms.space"
 34# __status__ = "Prototype"
 35__status__ = "Development"
 36# __status__ = "Production"
 37
 38
 39from cengal.parallel_execution.coroutines.coro_scheduler import *
 40from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro
 41from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import RunCoro
 42from cengal.parallel_execution.coroutines.coro_standard_services.async_event_bus import AsyncEventBus, AsyncEventBusRequest, EventID
 43from cengal.parallel_execution.coroutines.coro_standard_services.wait_coro import WaitCoro, WaitCoroRequest, PutSingleCoroParams, CoroutineNotFoundError, TimeoutError
 44from cengal.time_management.cpu_clock_cycles import perf_counter
 45from cengal.math.numbers import RationalNumber
 46from typing import Optional, Union
 47
 48
 49async def wait_for_event(i: Interface, event: EventID):
 50    await i(AsyncEventBus, AsyncEventBusRequest().wait(event))
 51    return True
 52
 53
 54class Lock:
 55    def __init__(self, event: EventID, timeout: Optional[RationalNumber] = None, parent: Optional['Lock'] = None) -> None:
 56        self._locked: bool = False
 57        self._event: EventID = event
 58        self.timeout: Optional[RationalNumber] = timeout
 59        self.parent: Optional['Lock'] = parent
 60        self.last_lock_attempt_successful: Union[None, bool] = None
 61    
 62    @property
 63    def locked(self):
 64        if self.parent is None:
 65            return self._locked
 66        else:
 67            return self.parent.locked
 68    
 69    @locked.setter
 70    def locked(self, value: bool):
 71        if self.parent is None:
 72            self._locked = value
 73        else:
 74            self.parent.locked = value
 75    
 76    @property
 77    def event(self):
 78        if self.parent is None:
 79            return self._event
 80        else:
 81            return self.parent.event
 82    
 83    @event.setter
 84    def event(self, value: EventID):
 85        if self.parent is None:
 86            self._event = value
 87        else:
 88            self.parent.event = value
 89    
 90    def lock(self, timeout: RationalNumber):
 91        return Lock(self.event, timeout, self)
 92    
 93    def try_lock(self):
 94        return Lock(self.event, 0, self)
 95    
 96    def __enter__(self):
 97        if self.locked:
 98            if 0 == self.timeout:
 99                self.last_lock_attempt_successful = False
100                return False
101            elif self.timeout is None:
102                i: Interface = current_interface()
103                i(RunCoro, wait_for_event, self.event)
104                self.locked = True
105                self.last_lock_attempt_successful = True
106                return True
107            else:
108                i = current_interface()
109                try:
110                    need_to_repeat = True
111                    time_spend = 0
112                    while need_to_repeat and ((self.timeout is None) or (time_spend < self.timeout)):
113                        start_time: float = perf_counter()
114                        wait_for_event_coro_id: CoroID = i(PutCoro, wait_for_event, self.event)
115                        try:
116                            i(WaitCoro, WaitCoroRequest(self.timeout - time_spend, kill_on_timeout=True, result_required=True).single(wait_for_event_coro_id))
117                        except CoroutineNotFoundError:
118                            pass
119
120                        time_spend += perf_counter() - start_time
121                        if not self.locked:
122                            self.locked = True
123                            self.last_lock_attempt_successful = True
124                            need_to_repeat = False
125                            return True
126                except TimeoutError:
127                    pass
128                
129                return False
130        else:
131            self.locked = True
132            self.last_lock_attempt_successful = True
133            return True
134    
135    def __exit__(self, type, value: Exception, traceback):
136        self.locked = False
137        i: Interface = current_interface()
138        i(AsyncEventBus, AsyncEventBusRequest().send_event(self.event, None))
139
140    async def __aenter__(self):
141        if self.locked:
142            if 0 == self.timeout:
143                self.last_lock_attempt_successful = False
144                return False
145            elif self.timeout is None:
146                i: Interface = current_interface()
147                await i(RunCoro, wait_for_event, self.event)
148                self.locked = True
149                self.last_lock_attempt_successful = True
150                return True
151            else:
152                i = current_interface()
153                try:
154                    need_to_repeat = True
155                    time_spend = 0
156                    while need_to_repeat and ((self.timeout is None) or (time_spend < self.timeout)):
157                        start_time: float = perf_counter()
158                        wait_for_event_coro_id: CoroID = await i(PutCoro, wait_for_event, self.event)
159                        try:
160                            await i(WaitCoro, WaitCoroRequest(self.timeout - time_spend, kill_on_timeout=True, result_required=True).single(wait_for_event_coro_id))
161                        except CoroutineNotFoundError:
162                            pass
163
164                        time_spend += perf_counter() - start_time
165                        if not self.locked:
166                            self.locked = True
167                            self.last_lock_attempt_successful = True
168                            need_to_repeat = False
169                            return True
170                except TimeoutError:
171                    pass
172                
173                return False
174        else:
175            self.locked = True
176            self.last_lock_attempt_successful = True
177            return True
178
179    async def __aexit__(self, type, value, traceback):
180        self.locked = False
181        i: Interface = current_interface()
182        await i(AsyncEventBus, AsyncEventBusRequest().send_event(self.event, None))
class Lock:
 55class Lock:
 56    def __init__(self, event: EventID, timeout: Optional[RationalNumber] = None, parent: Optional['Lock'] = None) -> None:
 57        self._locked: bool = False
 58        self._event: EventID = event
 59        self.timeout: Optional[RationalNumber] = timeout
 60        self.parent: Optional['Lock'] = parent
 61        self.last_lock_attempt_successful: Union[None, bool] = None
 62    
 63    @property
 64    def locked(self):
 65        if self.parent is None:
 66            return self._locked
 67        else:
 68            return self.parent.locked
 69    
 70    @locked.setter
 71    def locked(self, value: bool):
 72        if self.parent is None:
 73            self._locked = value
 74        else:
 75            self.parent.locked = value
 76    
 77    @property
 78    def event(self):
 79        if self.parent is None:
 80            return self._event
 81        else:
 82            return self.parent.event
 83    
 84    @event.setter
 85    def event(self, value: EventID):
 86        if self.parent is None:
 87            self._event = value
 88        else:
 89            self.parent.event = value
 90    
 91    def lock(self, timeout: RationalNumber):
 92        return Lock(self.event, timeout, self)
 93    
 94    def try_lock(self):
 95        return Lock(self.event, 0, self)
 96    
 97    def __enter__(self):
 98        if self.locked:
 99            if 0 == self.timeout:
100                self.last_lock_attempt_successful = False
101                return False
102            elif self.timeout is None:
103                i: Interface = current_interface()
104                i(RunCoro, wait_for_event, self.event)
105                self.locked = True
106                self.last_lock_attempt_successful = True
107                return True
108            else:
109                i = current_interface()
110                try:
111                    need_to_repeat = True
112                    time_spend = 0
113                    while need_to_repeat and ((self.timeout is None) or (time_spend < self.timeout)):
114                        start_time: float = perf_counter()
115                        wait_for_event_coro_id: CoroID = i(PutCoro, wait_for_event, self.event)
116                        try:
117                            i(WaitCoro, WaitCoroRequest(self.timeout - time_spend, kill_on_timeout=True, result_required=True).single(wait_for_event_coro_id))
118                        except CoroutineNotFoundError:
119                            pass
120
121                        time_spend += perf_counter() - start_time
122                        if not self.locked:
123                            self.locked = True
124                            self.last_lock_attempt_successful = True
125                            need_to_repeat = False
126                            return True
127                except TimeoutError:
128                    pass
129                
130                return False
131        else:
132            self.locked = True
133            self.last_lock_attempt_successful = True
134            return True
135    
136    def __exit__(self, type, value: Exception, traceback):
137        self.locked = False
138        i: Interface = current_interface()
139        i(AsyncEventBus, AsyncEventBusRequest().send_event(self.event, None))
140
141    async def __aenter__(self):
142        if self.locked:
143            if 0 == self.timeout:
144                self.last_lock_attempt_successful = False
145                return False
146            elif self.timeout is None:
147                i: Interface = current_interface()
148                await i(RunCoro, wait_for_event, self.event)
149                self.locked = True
150                self.last_lock_attempt_successful = True
151                return True
152            else:
153                i = current_interface()
154                try:
155                    need_to_repeat = True
156                    time_spend = 0
157                    while need_to_repeat and ((self.timeout is None) or (time_spend < self.timeout)):
158                        start_time: float = perf_counter()
159                        wait_for_event_coro_id: CoroID = await i(PutCoro, wait_for_event, self.event)
160                        try:
161                            await i(WaitCoro, WaitCoroRequest(self.timeout - time_spend, kill_on_timeout=True, result_required=True).single(wait_for_event_coro_id))
162                        except CoroutineNotFoundError:
163                            pass
164
165                        time_spend += perf_counter() - start_time
166                        if not self.locked:
167                            self.locked = True
168                            self.last_lock_attempt_successful = True
169                            need_to_repeat = False
170                            return True
171                except TimeoutError:
172                    pass
173                
174                return False
175        else:
176            self.locked = True
177            self.last_lock_attempt_successful = True
178            return True
179
180    async def __aexit__(self, type, value, traceback):
181        self.locked = False
182        i: Interface = current_interface()
183        await i(AsyncEventBus, AsyncEventBusRequest().send_event(self.event, None))
Lock( event: str, timeout: typing.Union[int, float, NoneType] = None, parent: typing.Union[Lock, NoneType] = None)
56    def __init__(self, event: EventID, timeout: Optional[RationalNumber] = None, parent: Optional['Lock'] = None) -> None:
57        self._locked: bool = False
58        self._event: EventID = event
59        self.timeout: Optional[RationalNumber] = timeout
60        self.parent: Optional['Lock'] = parent
61        self.last_lock_attempt_successful: Union[None, bool] = None
timeout: Union[int, float, NoneType]
parent: Union[Lock, NoneType]
last_lock_attempt_successful: Union[NoneType, bool]
locked
63    @property
64    def locked(self):
65        if self.parent is None:
66            return self._locked
67        else:
68            return self.parent.locked
event
77    @property
78    def event(self):
79        if self.parent is None:
80            return self._event
81        else:
82            return self.parent.event
def lock(self, timeout: typing.Union[int, float]):
91    def lock(self, timeout: RationalNumber):
92        return Lock(self.event, timeout, self)
def try_lock(self):
94    def try_lock(self):
95        return Lock(self.event, 0, self)