cengal.parallel_execution.coroutines.integrations.qt.pyqt6.versions.v_0.pyqt6
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19__all__ = [ 20 'WrongQtVersion', 21 'exec_app', 22 'execa', 23 'aemit_signal', 24 'aemit', 25 'by_coro', 26 'aby_coro', 27 'modal_blocking', 28 'amodal_blocking', 29 'block_main_loop', 30 'ablock_main_loop', 31 'modal', 32 'amodal', 33 'CoroSlot', 34 'CSlot', 35 'coro_slot_implicit', 36 'cslot_implicit', 37 'csloti', 38 'csi', 39 'coro_slot_gly_patched', 40 'cslot_gly_patched', 41 'cslotglyp', 42 'cslotgp', 43 'csgp', 44 'coro_slot_agly_patched', 45 'cslot_agly_patched', 46 'cslotaglyp', 47 'cslotagp', 48 'csagp', 49 'coro_slot_explicit', 50 'cslot_explicit', 51 'cslotex', 52 'csex', 53 'qt_exec_in_coro', 54 'aqt_exec_in_coro', 55 'CoroThreadWorker', 56 'CoroThreadWithWorker', 57] 58 59 60""" 61Module Docstring 62Docstrings: http://www.python.org/dev/peps/pep-0257/ 63""" 64 65__author__ = "ButenkoMS <gtalk@butenkoms.space>" 66__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 67__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 68__license__ = "Apache License, Version 2.0" 69__version__ = "4.4.1" 70__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 71__email__ = "gtalk@butenkoms.space" 72# __status__ = "Prototype" 73__status__ = "Development" 74# __status__ = "Production" 75 76 77from cengal.parallel_execution.coroutines.coro_scheduler import CoroScheduler, CoroSchedulerType, Interface, Coro, AnyWorker, current_interface, current_coro_scheduler, cs_coro, cs_acoro 78from cengal.parallel_execution.coroutines.coro_standard_services.loop_yield import gly, CoroPriority, gly_patched, agly_patched 79from cengal.parallel_execution.coroutines.coro_standard_services.run_coro import RunCoro 80from cengal.parallel_execution.coroutines.coro_standard_services.put_coro import PutCoro 81from cengal.parallel_execution.coroutines.coro_standard_services.asyncio_loop import AsyncioLoopRequest 82from cengal.parallel_execution.coroutines.coro_standard_services.async_event_bus import AsyncEventBusRequest, try_send_async_event 83from cengal.parallel_execution.coroutines.coro_standard_services.simple_yield import Yield 84from cengal.parallel_execution.coroutines.coro_standard_services.shutdown_loop import ShutdownLoop 85from cengal.parallel_execution.coroutines.coro_standard_services.instance import InstanceRequest 86from cengal.parallel_execution.coroutines.coro_tools.run_in_loop import run_in_loop 87from cengal.parallel_execution.coroutines.integrations.qt.common.exceptions import WrongQtVersion 88from cengal.code_flow_control.smart_values import ValueHolder 89from cengal.data_generation.id_generator import IDGenerator 90 91from PyQt6.QtCore import pyqtSlot, QObject, Qt, QTimer, pyqtSignal, QThread 92from PyQt6.QtWidgets import QApplication 93 94from inspect import signature, Signature 95from contextlib import contextmanager, asynccontextmanager 96from functools import wraps, update_wrapper, partial 97from typing import Optional, Any, Callable, Hashable 98 99 100YIELD_ALLOWED_EVENT = 'QT_yield_allowed_event' 101YIELD_IN_WORK_EVENT = 'QT_yield_in_work_event' 102MODAL_RESULT_EVENT = 'QT_modal_result_event' 103MODAL_COUNTER = IDGenerator() 104 105 106def exec_app(app, default_priority: CoroPriority = CoroPriority.normal) -> int: 107 if not isinstance(app, QApplication): 108 raise WrongQtVersion('Qt version is not PyQt6') 109 110 timer = QTimer() 111 ly = gly(default_priority) 112 yield_allowed: ValueHolder[bool] = ValueHolder(True, True) 113 yield_in_work: ValueHolder[bool] = ValueHolder(True, True) 114 def yield_func(): 115 if yield_allowed.value: 116 yield_in_work.value = True 117 ly() 118 else: 119 yield_in_work.value = False 120 121 i: Interface = current_interface() 122 i(InstanceRequest().set(YIELD_ALLOWED_EVENT, yield_allowed)) 123 i(InstanceRequest().set(YIELD_IN_WORK_EVENT, yield_in_work)) 124 timer.timeout.connect(yield_func, Qt.ConnectionType.QueuedConnection) 125 timer.start(0) 126 def cleanup_callback(): 127 timer.stop() 128 129 app.aboutToQuit.connect(cleanup_callback) 130 return app.exec() 131 132 133execa = exec_app 134 135 136async def aemit_signal(signal, *args, **kwargs): 137 i: Interface = current_interface() 138 def coro(i: Interface): 139 signal.emit(*args, **kwargs) 140 141 return await i(RunCoro, coro) 142 143 144aemit = aemit_signal 145 146 147def by_coro(callable: Callable, *args, **kwargs): 148 def coro(i: Interface): 149 callable(*args, **kwargs) 150 151 return current_interface()(RunCoro, coro) 152 153 154async def aby_coro(callable: Callable, *args, **kwargs): 155 def coro(i: Interface): 156 callable(*args, **kwargs) 157 158 return await current_interface()(RunCoro, coro) 159 160 161def stop_yield_to_main_loop(): 162 i: Interface = current_interface() 163 yield_allowed: ValueHolder[bool] = i(InstanceRequest().wait(YIELD_ALLOWED_EVENT)) 164 yield_allowed.value = False 165 yield_in_work: ValueHolder[bool] = i(InstanceRequest().wait(YIELD_IN_WORK_EVENT)) 166 while yield_in_work.value: 167 i(Yield) 168 169 170@contextmanager 171def block_main_loop(): 172 i: Interface = current_interface() 173 yield_allowed: ValueHolder[bool] = i(InstanceRequest().wait(YIELD_ALLOWED_EVENT)) 174 yield_allowed.value = False 175 yield_in_work: ValueHolder[bool] = i(InstanceRequest().wait(YIELD_IN_WORK_EVENT)) 176 while yield_in_work.value: 177 i(Yield) 178 179 try: 180 yield 181 finally: 182 yield_allowed.value = True 183 184 185@asynccontextmanager 186async def ablock_main_loop(): 187 i: Interface = current_interface() 188 yield_allowed: ValueHolder[bool] = await i(InstanceRequest().wait(YIELD_ALLOWED_EVENT)) 189 yield_allowed.value = False 190 yield_in_work: ValueHolder[bool] = await i(InstanceRequest().wait(YIELD_IN_WORK_EVENT)) 191 while yield_in_work.value: 192 await i(Yield) 193 194 try: 195 yield 196 finally: 197 yield_allowed.value = True 198 199 200def modal_blocking(modal_obj, *args, **kwargs): 201 with block_main_loop(): 202 return modal_obj(*args, **kwargs) 203 204 205async def amodal_blocking(modal_obj, *args, **kwargs): 206 async with ablock_main_loop(): 207 return modal_obj(*args, **kwargs) 208 209 210def modal(callable_with_modal, *args, **kwargs): 211 class ShowModal(QObject): 212 signal = pyqtSignal() 213 214 def __init__(self, cs: CoroSchedulerType, result_event: Hashable): 215 super().__init__() 216 self.cs: CoroSchedulerType = cs 217 self.result_event: Hashable = result_event 218 self.signal.connect(self.show_modal, Qt.ConnectionType.QueuedConnection) 219 220 @pyqtSlot() 221 def show_modal(self): 222 result = callable_with_modal(*args, **kwargs) 223 try_send_async_event(self.cs, self.result_event, result) 224 225 event = (MODAL_RESULT_EVENT, MODAL_COUNTER()) 226 sm: ShowModal = ShowModal(current_coro_scheduler(), event) 227 sm.signal.emit() 228 i: Interface = current_interface() 229 return i(AsyncEventBusRequest().wait(event)) 230 231 232async def amodal(callable_with_modal, *args, **kwargs): 233 class ShowModal(QObject): 234 signal = pyqtSignal() 235 236 def __init__(self, cs: CoroSchedulerType, result_event: Hashable): 237 super().__init__() 238 self.cs: CoroSchedulerType = cs 239 self.result_event: Hashable = result_event 240 self.signal.connect(self.show_modal, Qt.ConnectionType.QueuedConnection) 241 242 @pyqtSlot() 243 def show_modal(self): 244 result = callable_with_modal(*args, **kwargs) 245 try_send_async_event(self.cs, self.result_event, result) 246 247 event = (MODAL_RESULT_EVENT, MODAL_COUNTER()) 248 sm: ShowModal = ShowModal(current_coro_scheduler(), event) 249 sm.signal.emit() 250 i: Interface = current_interface() 251 return await i(AsyncEventBusRequest().wait(event)) 252 253 254# class CoroSlot(QObject): 255class CoroSlot: 256 def __init__(self, *types: type, name: Optional[str] = None, result: Optional[str] = None) -> None: 257 self._types = types 258 self._name = name 259 self._result = result 260 self._coro = None 261 262 def __call__(self, coro: Coro, method = None) -> Any: 263 method = method or self.implicit_coro_impl 264 return method(coro) 265 266 def implicit_coro_impl(self, coro: Coro) -> Any: 267 self._coro = coro 268 269 def func_wrapper(*args, **kwargs): 270 cs: CoroSchedulerType = current_coro_scheduler() 271 service: PutCoro = cs.get_service_instance(PutCoro) 272 return service._add_direct_request(cs_coro(coro), *args, **kwargs) 273 274 coro_worker_sign: Signature = signature(coro) 275 update_wrapper(func_wrapper, coro) 276 func_wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation) 277 return pyqtSlot(*self._types, self._name, self._result)(func_wrapper) 278 279 def implicit_coro(self): 280 return partial(self, method=self.implicit_coro_impl) 281 282 ic = implicit_coro 283 284 def gly_patched_function_impl(self, func: Callable) -> Any: 285 return self.__call__(gly_patched(func)) 286 287 def gly_patched_function(self) -> Any: 288 return partial(self, method=self.gly_patched_function_impl) 289 290 glypf = gly_patched_function 291 gpf = gly_patched_function 292 gp = gly_patched_function 293 294 def agly_patched_function_impl(self, afunc: Callable) -> Any: 295 return self.__call__(agly_patched(afunc)) 296 297 def agly_patched_function(self) -> Any: 298 return partial(self, method=self.agly_patched_function_impl) 299 300 aglypf = agly_patched_function 301 agpf = agly_patched_function 302 agp = agly_patched_function 303 304 def explicit_coro_impl(self, coro: Coro) -> Any: 305 self._coro = coro 306 307 def func_wrapper(*args, **kwargs): 308 cs: CoroSchedulerType = current_coro_scheduler() 309 service: PutCoro = cs.get_service_instance(PutCoro) 310 return service._add_direct_request(coro, *args, **kwargs) 311 312 coro_worker_sign: Signature = signature(coro) 313 update_wrapper(func_wrapper, coro) 314 func_wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation) 315 return pyqtSlot(*self._types, self._name, self._result)(func_wrapper) 316 317 def explicit_coro(self) -> Any: 318 return partial(self, method=self.explicit_coro_impl) 319 320 ec = explicit_coro 321 interface = explicit_coro 322 i = interface 323 324 325CSlot = CoroSlot 326 327 328def coro_slot_implicit(*types: type, name: Optional[str] = None, result: Optional[str] = None): 329 def decorator(coro: Coro) -> Any: 330 return CoroSlot(*types, name=name, result=result).implicit_coro()(coro) 331 332 return decorator 333 334cslot_implicit = coro_slot_implicit 335csloti = coro_slot_implicit 336csi = coro_slot_implicit 337 338 339def coro_slot_gly_patched(*types: type, name: Optional[str] = None, result: Optional[str] = None): 340 def decorator(func: Callable) -> Any: 341 return CoroSlot(*types, name=name, result=result).gly_patched_function()(func) 342 343 return decorator 344 345cslot_gly_patched = coro_slot_gly_patched 346cslotglyp = coro_slot_gly_patched 347cslotgp = coro_slot_gly_patched 348csgp = coro_slot_gly_patched 349 350 351def coro_slot_agly_patched(*types: type, name: Optional[str] = None, result: Optional[str] = None): 352 def decorator(afunc: Callable) -> Any: 353 return CoroSlot(*types, name=name, result=result).agly_patched_function()(afunc) 354 355 return decorator 356 357cslot_agly_patched = coro_slot_agly_patched 358cslotaglyp = coro_slot_agly_patched 359cslotagp = coro_slot_agly_patched 360csagp = coro_slot_gly_patched 361 362 363def coro_slot_explicit(*types: type, name: Optional[str] = None, result: Optional[str] = None): 364 def decorator(coro: Coro) -> Any: 365 return CoroSlot(*types, name=name, result=result).explicit_coro()(coro) 366 367 return decorator 368 369cslot_explicit = coro_slot_explicit 370cslotex = coro_slot_explicit 371csex = coro_slot_explicit 372 373 374def qt_exec_in_coro(func: Callable, default_priority: CoroPriority = CoroPriority.normal) -> Callable: 375 @wraps(func) 376 def wrapper(*args, **kwargs): 377 cs: CoroSchedulerType = current_coro_scheduler() 378 cs.high_cpu_utilisation_mode = False 379 cs.use_internal_sleep = False 380 i: Interface = current_interface() 381 i(AsyncioLoopRequest().use_higher_level_sleep_manager()) 382 i(AsyncioLoopRequest().ensure_loop(interrupt_when_no_requests=True)) 383 i(AsyncioLoopRequest().turn_on_loops_intercommunication()) 384 385 app_or_tuple = func(*args, **kwargs) 386 on_exit = lambda ret: ret 387 if isinstance(app_or_tuple, QApplication): 388 app = app_or_tuple 389 elif isinstance(app_or_tuple, tuple): 390 app, on_exit = app_or_tuple 391 else: 392 raise RuntimeError("qt_exec_in_coro must return either QApplication or (QApplication, on_exit) tuple") 393 394 # Run the event loop 395 ret = i(RunCoro, cs_coro(execa), app, default_priority) 396 ret = i(RunCoro, cs_coro(on_exit), ret) 397 i(ShutdownLoop) 398 return ret 399 400 return cs_coro(wrapper) 401 402 403async def aqt_exec_in_coro(func: Callable, default_priority: CoroPriority = CoroPriority.normal) -> Callable: 404 @wraps(func) 405 async def wrapper(*args, **kwargs): 406 i: Interface = current_interface() 407 await i(AsyncioLoopRequest().ensure_loop(interrupt_when_no_requests=True)) 408 await i(AsyncioLoopRequest().turn_on_loops_intercommunication()) 409 410 app_or_tuple = func(*args, **kwargs) 411 on_exit = lambda ret: ret 412 if isinstance(app_or_tuple, QApplication): 413 app = app_or_tuple 414 else: 415 app, on_exit = app_or_tuple 416 417 ret = await i(RunCoro, cs_coro(execa), app, default_priority) 418 ret = await i(RunCoro, cs_coro(on_exit), ret) 419 await i(ShutdownLoop) 420 return ret 421 422 return cs_acoro(wrapper) 423 424 425class CoroThreadWorker(QObject): 426 def __init__(self, worker: AnyWorker) -> None: 427 super().__init__() 428 self._worker: AnyWorker = worker 429 self._cs: CoroSchedulerType = None 430 self.allowed_to_run = False 431 432 def run(self): 433 self.allowed_to_run = True 434 async def my_worker_wrapper(i: Interface, worker): 435 self._cs = current_coro_scheduler() 436 await i(RunCoro, self.setup) 437 await i(RunCoro, worker) 438 439 run_in_loop(my_worker_wrapper, self._worker) 440 441 async def setup(self, i: Interface): 442 await i(AsyncioLoopRequest().ensure_loop(interrupt_when_no_requests=True)) 443 await i(AsyncioLoopRequest().turn_on_loops_intercommunication()) 444 445 def stop(self): 446 if not self.allowed_to_run: 447 return 448 449 self.allowed_to_run = False 450 async def coro(i: Interface): 451 await i(ShutdownLoop) 452 453 if self._cs is not None: 454 service: PutCoro = self._cs.get_service_instance(PutCoro) 455 return service._add_direct_request(coro) 456 457 458class CoroThread(QThread): 459 def __init__(self, parent, worker: CoroThreadWorker) -> None: 460 super().__init__(parent) 461 self._worker: CoroThreadWorker = worker 462 self._worker.moveToThread(self) 463 self.started.connect(self._worker.run) 464 465 def stop(self): 466 self.quit() 467 self.wait() 468 469 470class CoroThreadWithWorker(CoroThreadWorker): 471 def __init__(self, parent, worker: AnyWorker) -> None: 472 super().__init__(worker) 473 self._thread: CoroThread = CoroThread(parent, self) 474 475 def start(self): 476 self._thread.start() 477 478 def stop(self): 479 super().stop() 480 self._thread.stop()
class
WrongQtVersion(builtins.Exception):
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
def
exec_app( app, default_priority: cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield.CoroPriority = <CoroPriority.normal: 1>) -> int:
107def exec_app(app, default_priority: CoroPriority = CoroPriority.normal) -> int: 108 if not isinstance(app, QApplication): 109 raise WrongQtVersion('Qt version is not PyQt6') 110 111 timer = QTimer() 112 ly = gly(default_priority) 113 yield_allowed: ValueHolder[bool] = ValueHolder(True, True) 114 yield_in_work: ValueHolder[bool] = ValueHolder(True, True) 115 def yield_func(): 116 if yield_allowed.value: 117 yield_in_work.value = True 118 ly() 119 else: 120 yield_in_work.value = False 121 122 i: Interface = current_interface() 123 i(InstanceRequest().set(YIELD_ALLOWED_EVENT, yield_allowed)) 124 i(InstanceRequest().set(YIELD_IN_WORK_EVENT, yield_in_work)) 125 timer.timeout.connect(yield_func, Qt.ConnectionType.QueuedConnection) 126 timer.start(0) 127 def cleanup_callback(): 128 timer.stop() 129 130 app.aboutToQuit.connect(cleanup_callback) 131 return app.exec()
def
execa( app, default_priority: cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield.CoroPriority = <CoroPriority.normal: 1>) -> int:
107def exec_app(app, default_priority: CoroPriority = CoroPriority.normal) -> int: 108 if not isinstance(app, QApplication): 109 raise WrongQtVersion('Qt version is not PyQt6') 110 111 timer = QTimer() 112 ly = gly(default_priority) 113 yield_allowed: ValueHolder[bool] = ValueHolder(True, True) 114 yield_in_work: ValueHolder[bool] = ValueHolder(True, True) 115 def yield_func(): 116 if yield_allowed.value: 117 yield_in_work.value = True 118 ly() 119 else: 120 yield_in_work.value = False 121 122 i: Interface = current_interface() 123 i(InstanceRequest().set(YIELD_ALLOWED_EVENT, yield_allowed)) 124 i(InstanceRequest().set(YIELD_IN_WORK_EVENT, yield_in_work)) 125 timer.timeout.connect(yield_func, Qt.ConnectionType.QueuedConnection) 126 timer.start(0) 127 def cleanup_callback(): 128 timer.stop() 129 130 app.aboutToQuit.connect(cleanup_callback) 131 return app.exec()
async def
aemit_signal(signal, *args, **kwargs):
async def
aemit(signal, *args, **kwargs):
def
by_coro(callable: typing.Callable, *args, **kwargs):
async def
aby_coro(callable: typing.Callable, *args, **kwargs):
def
modal_blocking(modal_obj, *args, **kwargs):
async def
amodal_blocking(modal_obj, *args, **kwargs):
@contextmanager
def
block_main_loop():
171@contextmanager 172def block_main_loop(): 173 i: Interface = current_interface() 174 yield_allowed: ValueHolder[bool] = i(InstanceRequest().wait(YIELD_ALLOWED_EVENT)) 175 yield_allowed.value = False 176 yield_in_work: ValueHolder[bool] = i(InstanceRequest().wait(YIELD_IN_WORK_EVENT)) 177 while yield_in_work.value: 178 i(Yield) 179 180 try: 181 yield 182 finally: 183 yield_allowed.value = True
@asynccontextmanager
async def
ablock_main_loop():
186@asynccontextmanager 187async def ablock_main_loop(): 188 i: Interface = current_interface() 189 yield_allowed: ValueHolder[bool] = await i(InstanceRequest().wait(YIELD_ALLOWED_EVENT)) 190 yield_allowed.value = False 191 yield_in_work: ValueHolder[bool] = await i(InstanceRequest().wait(YIELD_IN_WORK_EVENT)) 192 while yield_in_work.value: 193 await i(Yield) 194 195 try: 196 yield 197 finally: 198 yield_allowed.value = True
def
modal(callable_with_modal, *args, **kwargs):
211def modal(callable_with_modal, *args, **kwargs): 212 class ShowModal(QObject): 213 signal = pyqtSignal() 214 215 def __init__(self, cs: CoroSchedulerType, result_event: Hashable): 216 super().__init__() 217 self.cs: CoroSchedulerType = cs 218 self.result_event: Hashable = result_event 219 self.signal.connect(self.show_modal, Qt.ConnectionType.QueuedConnection) 220 221 @pyqtSlot() 222 def show_modal(self): 223 result = callable_with_modal(*args, **kwargs) 224 try_send_async_event(self.cs, self.result_event, result) 225 226 event = (MODAL_RESULT_EVENT, MODAL_COUNTER()) 227 sm: ShowModal = ShowModal(current_coro_scheduler(), event) 228 sm.signal.emit() 229 i: Interface = current_interface() 230 return i(AsyncEventBusRequest().wait(event))
async def
amodal(callable_with_modal, *args, **kwargs):
233async def amodal(callable_with_modal, *args, **kwargs): 234 class ShowModal(QObject): 235 signal = pyqtSignal() 236 237 def __init__(self, cs: CoroSchedulerType, result_event: Hashable): 238 super().__init__() 239 self.cs: CoroSchedulerType = cs 240 self.result_event: Hashable = result_event 241 self.signal.connect(self.show_modal, Qt.ConnectionType.QueuedConnection) 242 243 @pyqtSlot() 244 def show_modal(self): 245 result = callable_with_modal(*args, **kwargs) 246 try_send_async_event(self.cs, self.result_event, result) 247 248 event = (MODAL_RESULT_EVENT, MODAL_COUNTER()) 249 sm: ShowModal = ShowModal(current_coro_scheduler(), event) 250 sm.signal.emit() 251 i: Interface = current_interface() 252 return await i(AsyncEventBusRequest().wait(event))
class
CoroSlot:
256class CoroSlot: 257 def __init__(self, *types: type, name: Optional[str] = None, result: Optional[str] = None) -> None: 258 self._types = types 259 self._name = name 260 self._result = result 261 self._coro = None 262 263 def __call__(self, coro: Coro, method = None) -> Any: 264 method = method or self.implicit_coro_impl 265 return method(coro) 266 267 def implicit_coro_impl(self, coro: Coro) -> Any: 268 self._coro = coro 269 270 def func_wrapper(*args, **kwargs): 271 cs: CoroSchedulerType = current_coro_scheduler() 272 service: PutCoro = cs.get_service_instance(PutCoro) 273 return service._add_direct_request(cs_coro(coro), *args, **kwargs) 274 275 coro_worker_sign: Signature = signature(coro) 276 update_wrapper(func_wrapper, coro) 277 func_wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation) 278 return pyqtSlot(*self._types, self._name, self._result)(func_wrapper) 279 280 def implicit_coro(self): 281 return partial(self, method=self.implicit_coro_impl) 282 283 ic = implicit_coro 284 285 def gly_patched_function_impl(self, func: Callable) -> Any: 286 return self.__call__(gly_patched(func)) 287 288 def gly_patched_function(self) -> Any: 289 return partial(self, method=self.gly_patched_function_impl) 290 291 glypf = gly_patched_function 292 gpf = gly_patched_function 293 gp = gly_patched_function 294 295 def agly_patched_function_impl(self, afunc: Callable) -> Any: 296 return self.__call__(agly_patched(afunc)) 297 298 def agly_patched_function(self) -> Any: 299 return partial(self, method=self.agly_patched_function_impl) 300 301 aglypf = agly_patched_function 302 agpf = agly_patched_function 303 agp = agly_patched_function 304 305 def explicit_coro_impl(self, coro: Coro) -> Any: 306 self._coro = coro 307 308 def func_wrapper(*args, **kwargs): 309 cs: CoroSchedulerType = current_coro_scheduler() 310 service: PutCoro = cs.get_service_instance(PutCoro) 311 return service._add_direct_request(coro, *args, **kwargs) 312 313 coro_worker_sign: Signature = signature(coro) 314 update_wrapper(func_wrapper, coro) 315 func_wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation) 316 return pyqtSlot(*self._types, self._name, self._result)(func_wrapper) 317 318 def explicit_coro(self) -> Any: 319 return partial(self, method=self.explicit_coro_impl) 320 321 ec = explicit_coro 322 interface = explicit_coro 323 i = interface
CoroSlot( *types: type, name: typing.Union[str, NoneType] = None, result: typing.Union[str, NoneType] = None)
def
implicit_coro_impl( self, coro: typing.Union[greenlet.greenlet, typing.Awaitable, typing.Coroutine, typing.Generator, typing.AsyncGenerator, typing.Callable]) -> Any:
267 def implicit_coro_impl(self, coro: Coro) -> Any: 268 self._coro = coro 269 270 def func_wrapper(*args, **kwargs): 271 cs: CoroSchedulerType = current_coro_scheduler() 272 service: PutCoro = cs.get_service_instance(PutCoro) 273 return service._add_direct_request(cs_coro(coro), *args, **kwargs) 274 275 coro_worker_sign: Signature = signature(coro) 276 update_wrapper(func_wrapper, coro) 277 func_wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values()), return_annotation=coro_worker_sign.return_annotation) 278 return pyqtSlot(*self._types, self._name, self._result)(func_wrapper)
def
explicit_coro_impl( self, coro: typing.Union[greenlet.greenlet, typing.Awaitable, typing.Coroutine, typing.Generator, typing.AsyncGenerator, typing.Callable]) -> Any:
305 def explicit_coro_impl(self, coro: Coro) -> Any: 306 self._coro = coro 307 308 def func_wrapper(*args, **kwargs): 309 cs: CoroSchedulerType = current_coro_scheduler() 310 service: PutCoro = cs.get_service_instance(PutCoro) 311 return service._add_direct_request(coro, *args, **kwargs) 312 313 coro_worker_sign: Signature = signature(coro) 314 update_wrapper(func_wrapper, coro) 315 func_wrapper.__signature__ = coro_worker_sign.replace(parameters=tuple(coro_worker_sign.parameters.values())[1:], return_annotation=coro_worker_sign.return_annotation) 316 return pyqtSlot(*self._types, self._name, self._result)(func_wrapper)
CSlot =
<class 'CoroSlot'>
def
coro_slot_implicit( *types: type, name: typing.Union[str, NoneType] = None, result: typing.Union[str, NoneType] = None):
def
cslot_implicit( *types: type, name: typing.Union[str, NoneType] = None, result: typing.Union[str, NoneType] = None):
def
csloti( *types: type, name: typing.Union[str, NoneType] = None, result: typing.Union[str, NoneType] = None):
def
csi( *types: type, name: typing.Union[str, NoneType] = None, result: typing.Union[str, NoneType] = None):
def
coro_slot_gly_patched( *types: type, name: typing.Union[str, NoneType] = None, result: typing.Union[str, NoneType] = None):
def
cslot_gly_patched( *types: type, name: typing.Union[str, NoneType] = None, result: typing.Union[str, NoneType] = None):
def
cslotglyp( *types: type, name: typing.Union[str, NoneType] = None, result: typing.Union[str, NoneType] = None):
def
cslotgp( *types: type, name: typing.Union[str, NoneType] = None, result: typing.Union[str, NoneType] = None):
def
csgp( *types: type, name: typing.Union[str, NoneType] = None, result: typing.Union[str, NoneType] = None):
def
coro_slot_agly_patched( *types: type, name: typing.Union[str, NoneType] = None, result: typing.Union[str, NoneType] = None):
def
cslot_agly_patched( *types: type, name: typing.Union[str, NoneType] = None, result: typing.Union[str, NoneType] = None):
def
cslotaglyp( *types: type, name: typing.Union[str, NoneType] = None, result: typing.Union[str, NoneType] = None):
def
cslotagp( *types: type, name: typing.Union[str, NoneType] = None, result: typing.Union[str, NoneType] = None):
def
csagp( *types: type, name: typing.Union[str, NoneType] = None, result: typing.Union[str, NoneType] = None):
def
coro_slot_explicit( *types: type, name: typing.Union[str, NoneType] = None, result: typing.Union[str, NoneType] = None):
def
cslot_explicit( *types: type, name: typing.Union[str, NoneType] = None, result: typing.Union[str, NoneType] = None):
def
cslotex( *types: type, name: typing.Union[str, NoneType] = None, result: typing.Union[str, NoneType] = None):
def
csex( *types: type, name: typing.Union[str, NoneType] = None, result: typing.Union[str, NoneType] = None):
def
qt_exec_in_coro( func: typing.Callable, default_priority: cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield.CoroPriority = <CoroPriority.normal: 1>) -> Callable:
375def qt_exec_in_coro(func: Callable, default_priority: CoroPriority = CoroPriority.normal) -> Callable: 376 @wraps(func) 377 def wrapper(*args, **kwargs): 378 cs: CoroSchedulerType = current_coro_scheduler() 379 cs.high_cpu_utilisation_mode = False 380 cs.use_internal_sleep = False 381 i: Interface = current_interface() 382 i(AsyncioLoopRequest().use_higher_level_sleep_manager()) 383 i(AsyncioLoopRequest().ensure_loop(interrupt_when_no_requests=True)) 384 i(AsyncioLoopRequest().turn_on_loops_intercommunication()) 385 386 app_or_tuple = func(*args, **kwargs) 387 on_exit = lambda ret: ret 388 if isinstance(app_or_tuple, QApplication): 389 app = app_or_tuple 390 elif isinstance(app_or_tuple, tuple): 391 app, on_exit = app_or_tuple 392 else: 393 raise RuntimeError("qt_exec_in_coro must return either QApplication or (QApplication, on_exit) tuple") 394 395 # Run the event loop 396 ret = i(RunCoro, cs_coro(execa), app, default_priority) 397 ret = i(RunCoro, cs_coro(on_exit), ret) 398 i(ShutdownLoop) 399 return ret 400 401 return cs_coro(wrapper)
async def
aqt_exec_in_coro( func: typing.Callable, default_priority: cengal.parallel_execution.coroutines.coro_standard_services.loop_yield.versions.v_0.loop_yield.CoroPriority = <CoroPriority.normal: 1>) -> Callable:
404async def aqt_exec_in_coro(func: Callable, default_priority: CoroPriority = CoroPriority.normal) -> Callable: 405 @wraps(func) 406 async def wrapper(*args, **kwargs): 407 i: Interface = current_interface() 408 await i(AsyncioLoopRequest().ensure_loop(interrupt_when_no_requests=True)) 409 await i(AsyncioLoopRequest().turn_on_loops_intercommunication()) 410 411 app_or_tuple = func(*args, **kwargs) 412 on_exit = lambda ret: ret 413 if isinstance(app_or_tuple, QApplication): 414 app = app_or_tuple 415 else: 416 app, on_exit = app_or_tuple 417 418 ret = await i(RunCoro, cs_coro(execa), app, default_priority) 419 ret = await i(RunCoro, cs_coro(on_exit), ret) 420 await i(ShutdownLoop) 421 return ret 422 423 return cs_acoro(wrapper)
class
CoroThreadWorker(PyQt6.QtCore.QObject):
426class CoroThreadWorker(QObject): 427 def __init__(self, worker: AnyWorker) -> None: 428 super().__init__() 429 self._worker: AnyWorker = worker 430 self._cs: CoroSchedulerType = None 431 self.allowed_to_run = False 432 433 def run(self): 434 self.allowed_to_run = True 435 async def my_worker_wrapper(i: Interface, worker): 436 self._cs = current_coro_scheduler() 437 await i(RunCoro, self.setup) 438 await i(RunCoro, worker) 439 440 run_in_loop(my_worker_wrapper, self._worker) 441 442 async def setup(self, i: Interface): 443 await i(AsyncioLoopRequest().ensure_loop(interrupt_when_no_requests=True)) 444 await i(AsyncioLoopRequest().turn_on_loops_intercommunication()) 445 446 def stop(self): 447 if not self.allowed_to_run: 448 return 449 450 self.allowed_to_run = False 451 async def coro(i: Interface): 452 await i(ShutdownLoop) 453 454 if self._cs is not None: 455 service: PutCoro = self._cs.get_service_instance(PutCoro) 456 return service._add_direct_request(coro)
QObject(parent: typing.Optional[QObject] = None)
CoroThreadWorker( worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]])
async def
setup( self, i: cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface):
Inherited Members
- PyQt6.QtCore.QObject
- blockSignals
- childEvent
- children
- connectNotify
- customEvent
- deleteLater
- disconnect
- disconnectNotify
- dumpObjectInfo
- dumpObjectTree
- dynamicPropertyNames
- event
- eventFilter
- findChild
- findChildren
- inherits
- installEventFilter
- isQuickItemType
- isSignalConnected
- isWidgetType
- isWindowType
- killTimer
- metaObject
- moveToThread
- objectName
- parent
- property
- pyqtConfigure
- receivers
- removeEventFilter
- sender
- senderSignalIndex
- setObjectName
- setParent
- setProperty
- signalsBlocked
- startTimer
- thread
- timerEvent
- tr
- staticMetaObject
- objectNameChanged
- destroyed
471class CoroThreadWithWorker(CoroThreadWorker): 472 def __init__(self, parent, worker: AnyWorker) -> None: 473 super().__init__(worker) 474 self._thread: CoroThread = CoroThread(parent, self) 475 476 def start(self): 477 self._thread.start() 478 479 def stop(self): 480 super().stop() 481 self._thread.stop()
QObject(parent: typing.Optional[QObject] = None)
CoroThreadWithWorker( parent, worker: typing.Union[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.ExplicitWorker, collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Any], collections.abc.Callable[cengal.parallel_execution.coroutines.coro_scheduler.versions.v_0.coro_scheduler.Interface, typing.Awaitable[typing.Any]]])
Inherited Members
- PyQt6.QtCore.QObject
- blockSignals
- childEvent
- children
- connectNotify
- customEvent
- deleteLater
- disconnect
- disconnectNotify
- dumpObjectInfo
- dumpObjectTree
- dynamicPropertyNames
- event
- eventFilter
- findChild
- findChildren
- inherits
- installEventFilter
- isQuickItemType
- isSignalConnected
- isWidgetType
- isWindowType
- killTimer
- metaObject
- moveToThread
- objectName
- parent
- property
- pyqtConfigure
- receivers
- removeEventFilter
- sender
- senderSignalIndex
- setObjectName
- setParent
- setProperty
- signalsBlocked
- startTimer
- thread
- timerEvent
- tr
- staticMetaObject
- objectNameChanged
- destroyed