cengal.parallel_execution.asyncio.run_in_process_pool.versions.v_0.run_in_process_pool
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18__all__ = ['run_coroutine_in_new_thread', 'ProcessPoolRuntimeError', 'ExecutorSetupBase', 'ExecutorTypeSetup', 'ExecutorInstanceSetup', 'InitializerSetup', 'ProcessPoolSetup', 'ProcessPool'] 19 20""" 21Module Docstring 22Docstrings: http://www.python.org/dev/peps/pep-0257/ 23""" 24 25__author__ = "ButenkoMS <gtalk@butenkoms.space>" 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 28__license__ = "Apache License, Version 2.0" 29__version__ = "4.4.1" 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 31__email__ = "gtalk@butenkoms.space" 32# __status__ = "Prototype" 33__status__ = "Development" 34# __status__ = "Production" 35 36 37from cengal.introspection.inspect import get_exception, is_async 38from cengal.code_flow_control.smart_values import ResultHolder 39 40import sys 41import asyncio 42from concurrent.futures import Executor 43from threading import Thread 44from functools import partial 45from typing import Callable, Optional, Type, Any, Tuple 46 47 48class ProcessPoolRuntimeError(RuntimeError): 49 pass 50 51 52class ExecutorSetupBase: 53 pass 54 55 56class ExecutorTypeSetup(ExecutorSetupBase): 57 def __init__(self, executor_type: Type[Executor], *args, **kwargs) -> None: 58 self.executor_type: Type[Executor] = executor_type 59 self.args = args 60 self.kwargs = kwargs 61 super().__init__() 62 63 64class ExecutorInstanceSetup(ExecutorSetupBase): 65 def __init__(self, executor: Executor) -> None: 66 self.executor: Executor = executor 67 super().__init__() 68 69 70class InitializerSetup: 71 def __init__(self, initializer: Callable, *args, **kwargs) -> None: 72 self.initializer: Callable = initializer 73 self.args = args 74 self.kwargs = kwargs 75 76 77class ProcessPoolSetup: 78 def __init__(self, is_multiprocessing: bool = True, loop: Optional[asyncio.AbstractEventLoop] = None) -> None: 79 self.loop: Optional[asyncio.AbstractEventLoop] = loop or asyncio.get_event_loop() 80 self.is_multiprocessing: bool = is_multiprocessing 81 82 83def run_coroutine_in_new_thread(coro: Callable, *args, **kwargs) -> Any: 84 def thread_worker(result_holder: ResultHolder, coro: Callable, *args, **kwargs): 85 result = None 86 exception = None 87 try: 88 import asyncio 89 result = asyncio.run(coro(*args, **kwargs)) 90 except: 91 exception = get_exception() 92 93 result_holder.value = (result, exception) 94 95 result_holder: ResultHolder = ResultHolder() 96 thread_args = [result_holder, coro,] + list(args) 97 thread: Thread = Thread(target=thread_worker, args=thread_args, kwargs=kwargs) 98 thread.start() 99 thread.join() 100 if result_holder: 101 result, exception = result_holder.value 102 else: 103 result = None 104 exception = RuntimeError('ProcessPool internal error') 105 106 return result, exception 107 108 109class ProcessPool: 110 def __init__(self, executor_setup: ExecutorSetupBase, initializer_setup: Optional[InitializerSetup] = None, process_pool_setup: Optional[ProcessPoolSetup] = None) -> None: 111 self.executor_setup: ExecutorSetupBase = executor_setup 112 self.process_pool_setup: Optional[ProcessPoolSetup] = process_pool_setup or ProcessPoolSetup() 113 self.initializer_setup: Optional[InitializerSetup] = initializer_setup 114 self.executor: Executor = None 115 self.partial_pool_initializer: Callable = None 116 117 self._create_pool() 118 119 def set_is_multiprocessing(self, is_multiprocessing: bool): 120 self.process_pool_setup.is_multiprocessing = is_multiprocessing 121 122 def _create_pool(self): 123 if self.initializer_setup is not None: 124 self.partial_pool_initializer = partial(ProcessPool._initializer, self.initializer_setup.initializer, *self.initializer_setup.args, **self.initializer_setup.kwargs) 125 126 if (3, 7) <= sys.version_info: 127 if isinstance(self.executor_setup, ExecutorInstanceSetup): 128 self.executor = self.executor_setup.executor 129 elif isinstance(self.executor_setup, ExecutorTypeSetup): 130 if self.partial_pool_initializer is not None: 131 self.executor_setup.kwargs['initializer'] = self.partial_pool_initializer 132 133 self.executor = self.executor_setup.executor_type(*self.executor_setup.args, **self.executor_setup.kwargs) 134 else: 135 raise ProcessPoolRuntimeError('Unknown "executor_setup" parameter type') 136 else: 137 self.executor = self.executor_setup.executor_type(*self.executor_setup.args, **self.executor_setup.kwargs) 138 139 def shutdown(self, wait=True, cancel_futures=False): 140 self.executor.shutdown(wait, cancel_futures=cancel_futures) 141 142 @staticmethod 143 def _initializer(initializer, *args, **kwargs): 144 import multiprocessing 145 146 initializer(multiprocessing.current_process()._identity, *args, **kwargs) 147 148 @staticmethod 149 def _pool_worker(worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]: 150 result = None 151 exception = None 152 try: 153 result = worker(*args, **kwargs) 154 except: 155 exception = get_exception() 156 157 return result, exception 158 159 @staticmethod 160 def _pool_worker_wrapper(partial_pool_initializer: Callable, worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]: 161 global process_initialized 162 if 'process_initialized' not in globals(): 163 process_initialized = False 164 165 if not process_initialized: 166 if partial_pool_initializer is not None: 167 partial_pool_initializer() 168 169 process_initialized = True 170 171 return ProcessPool._pool_worker(worker, *args, **kwargs) 172 173 @staticmethod 174 def _apool_worker(result_holder: ResultHolder, worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]: 175 result = None 176 exception = None 177 try: 178 import asyncio 179 result = asyncio.run(worker(*args, **kwargs)) 180 except: 181 exception = get_exception() 182 183 result_holder.value = (result, exception) 184 185 @staticmethod 186 def _apool_worker(worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]: 187 return run_coroutine_in_new_thread(worker, *args, **kwargs) 188 189 @staticmethod 190 def _apool_worker_wrapper(partial_pool_initializer: Callable, worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]: 191 global process_initialized 192 if 'process_initialized' not in globals(): 193 process_initialized = False 194 195 if not process_initialized: 196 if partial_pool_initializer is not None: 197 partial_pool_initializer() 198 199 process_initialized = True 200 201 return ProcessPool._apool_worker(worker, *args, **kwargs) 202 203 @staticmethod 204 async def _a_single_process_pool_worker(worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]: 205 result = None 206 exception = None 207 try: 208 result = await worker(*args, **kwargs) 209 except: 210 exception = get_exception() 211 212 return result, exception 213 214 @staticmethod 215 async def _a_single_process_pool_worker_wrapper(partial_pool_initializer: Callable, worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]: 216 global process_initialized 217 if 'process_initialized' not in globals(): 218 process_initialized = False 219 220 if not process_initialized: 221 if partial_pool_initializer is not None: 222 partial_pool_initializer() 223 224 process_initialized = True 225 226 return await ProcessPool._a_single_process_pool_worker(worker, *args, **kwargs) 227 228 async def pool_execute(self, worker: Callable, *args, **kwargs) -> Any: 229 if self.process_pool_setup.is_multiprocessing: 230 if ((3, 7) <= sys.version_info) or (self.partial_pool_initializer is None): 231 if is_async(worker): 232 partial_pool_worker = partial(ProcessPool._apool_worker, worker, *args, **kwargs) 233 else: 234 partial_pool_worker = partial(ProcessPool._pool_worker, worker, *args, **kwargs) 235 else: 236 if is_async(worker): 237 partial_pool_worker = partial(ProcessPool._apool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs) 238 else: 239 partial_pool_worker = partial(ProcessPool._pool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs) 240 241 result: Tuple[Any, Optional[BaseException]] = await self.process_pool_setup.loop.run_in_executor(self.executor, partial_pool_worker) 242 else: 243 if is_async(worker): 244 a_single_process_pool_worker = partial(ProcessPool._a_single_process_pool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs) 245 result = await a_single_process_pool_worker() 246 else: 247 partial_pool_worker = partial(ProcessPool._pool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs) 248 result = partial_pool_worker() 249 250 result, exception = result 251 252 if exception is not None: 253 raise exception 254 255 return result 256 257 async def __call__(self, worker: Callable, *args, **kwargs) -> Any: 258 return await self.pool_execute(worker, *args, **kwargs) 259 260 261# def pool_initializer(text): 262# import multiprocessing 263 264# print(text, multiprocessing.current_process()._identity[0]) 265 266 267# def create_pool(): 268# global executor_init_params 269# executor_init_params = (('hello pool',), dict()) 270# global executor 271# if (3, 7) <= sys.version_info: 272# pool_args, pool_kwargs = executor_init_params 273# partial_pool_initializer = partial(pool_initializer, *pool_args, **pool_kwargs) 274# executor = ProcessPoolExecutor(max_workers=2, initializer=partial_pool_initializer) 275# else: 276# executor = ProcessPoolExecutor(max_workers=2) 277 278 279# def pool_worker_impl(item: int): 280# return 1000 / item 281 282 283# def pool_worker(*args, **kwargs): 284# result = None 285# exception = None 286# try: 287# result = pool_worker_impl(*args, **kwargs) 288# except: 289# exception = get_exception() 290 291# return result, exception 292 293 294# def pool_worker_wrapper(pool_init_params, *args, **kwargs): 295# global process_initialized 296# if 'process_initialized' not in globals(): 297# process_initialized = False 298 299# if not process_initialized: 300# pool_args, pool_kwargs = pool_init_params 301# pool_initializer(*pool_args, **pool_kwargs) 302# process_initialized = True 303 304# return pool_worker(*args, **kwargs) 305 306 307# async def pool_execute(*args, **kwargs): 308# if (3, 7) <= sys.version_info: 309# partial_pool_worker = partial(pool_worker, *args, **kwargs) 310# else: 311# partial_pool_worker = partial(pool_worker_wrapper, executor_init_params, *args, **kwargs) 312 313# if is_multiprocessing: 314# result = await loop.run_in_executor(executor, partial_pool_worker) 315# else: 316# result = pool_worker(*args, **kwargs) 317 318# result, exception = result 319 320# if exception is not None: 321# raise exception 322 323# return result 324 325 326# async def pool_single_processing_example(item: int = 2): 327# return await pool_execute(item) 328 329 330# async def pool_gather_example(num: int = 3): 331# return await asyncio.gather(*[pool_execute(i) for i in range(num)])
def
run_coroutine_in_new_thread(coro: Callable, *args, **kwargs) -> Any:
84def run_coroutine_in_new_thread(coro: Callable, *args, **kwargs) -> Any: 85 def thread_worker(result_holder: ResultHolder, coro: Callable, *args, **kwargs): 86 result = None 87 exception = None 88 try: 89 import asyncio 90 result = asyncio.run(coro(*args, **kwargs)) 91 except: 92 exception = get_exception() 93 94 result_holder.value = (result, exception) 95 96 result_holder: ResultHolder = ResultHolder() 97 thread_args = [result_holder, coro,] + list(args) 98 thread: Thread = Thread(target=thread_worker, args=thread_args, kwargs=kwargs) 99 thread.start() 100 thread.join() 101 if result_holder: 102 result, exception = result_holder.value 103 else: 104 result = None 105 exception = RuntimeError('ProcessPool internal error') 106 107 return result, exception
class
ProcessPoolRuntimeError(builtins.RuntimeError):
Unspecified run-time error.
Inherited Members
- builtins.RuntimeError
- RuntimeError
- builtins.BaseException
- with_traceback
- args
class
ExecutorSetupBase:
57class ExecutorTypeSetup(ExecutorSetupBase): 58 def __init__(self, executor_type: Type[Executor], *args, **kwargs) -> None: 59 self.executor_type: Type[Executor] = executor_type 60 self.args = args 61 self.kwargs = kwargs 62 super().__init__()
65class ExecutorInstanceSetup(ExecutorSetupBase): 66 def __init__(self, executor: Executor) -> None: 67 self.executor: Executor = executor 68 super().__init__()
class
InitializerSetup:
71class InitializerSetup: 72 def __init__(self, initializer: Callable, *args, **kwargs) -> None: 73 self.initializer: Callable = initializer 74 self.args = args 75 self.kwargs = kwargs
class
ProcessPoolSetup:
78class ProcessPoolSetup: 79 def __init__(self, is_multiprocessing: bool = True, loop: Optional[asyncio.AbstractEventLoop] = None) -> None: 80 self.loop: Optional[asyncio.AbstractEventLoop] = loop or asyncio.get_event_loop() 81 self.is_multiprocessing: bool = is_multiprocessing
class
ProcessPool:
110class ProcessPool: 111 def __init__(self, executor_setup: ExecutorSetupBase, initializer_setup: Optional[InitializerSetup] = None, process_pool_setup: Optional[ProcessPoolSetup] = None) -> None: 112 self.executor_setup: ExecutorSetupBase = executor_setup 113 self.process_pool_setup: Optional[ProcessPoolSetup] = process_pool_setup or ProcessPoolSetup() 114 self.initializer_setup: Optional[InitializerSetup] = initializer_setup 115 self.executor: Executor = None 116 self.partial_pool_initializer: Callable = None 117 118 self._create_pool() 119 120 def set_is_multiprocessing(self, is_multiprocessing: bool): 121 self.process_pool_setup.is_multiprocessing = is_multiprocessing 122 123 def _create_pool(self): 124 if self.initializer_setup is not None: 125 self.partial_pool_initializer = partial(ProcessPool._initializer, self.initializer_setup.initializer, *self.initializer_setup.args, **self.initializer_setup.kwargs) 126 127 if (3, 7) <= sys.version_info: 128 if isinstance(self.executor_setup, ExecutorInstanceSetup): 129 self.executor = self.executor_setup.executor 130 elif isinstance(self.executor_setup, ExecutorTypeSetup): 131 if self.partial_pool_initializer is not None: 132 self.executor_setup.kwargs['initializer'] = self.partial_pool_initializer 133 134 self.executor = self.executor_setup.executor_type(*self.executor_setup.args, **self.executor_setup.kwargs) 135 else: 136 raise ProcessPoolRuntimeError('Unknown "executor_setup" parameter type') 137 else: 138 self.executor = self.executor_setup.executor_type(*self.executor_setup.args, **self.executor_setup.kwargs) 139 140 def shutdown(self, wait=True, cancel_futures=False): 141 self.executor.shutdown(wait, cancel_futures=cancel_futures) 142 143 @staticmethod 144 def _initializer(initializer, *args, **kwargs): 145 import multiprocessing 146 147 initializer(multiprocessing.current_process()._identity, *args, **kwargs) 148 149 @staticmethod 150 def _pool_worker(worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]: 151 result = None 152 exception = None 153 try: 154 result = worker(*args, **kwargs) 155 except: 156 exception = get_exception() 157 158 return result, exception 159 160 @staticmethod 161 def _pool_worker_wrapper(partial_pool_initializer: Callable, worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]: 162 global process_initialized 163 if 'process_initialized' not in globals(): 164 process_initialized = False 165 166 if not process_initialized: 167 if partial_pool_initializer is not None: 168 partial_pool_initializer() 169 170 process_initialized = True 171 172 return ProcessPool._pool_worker(worker, *args, **kwargs) 173 174 @staticmethod 175 def _apool_worker(result_holder: ResultHolder, worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]: 176 result = None 177 exception = None 178 try: 179 import asyncio 180 result = asyncio.run(worker(*args, **kwargs)) 181 except: 182 exception = get_exception() 183 184 result_holder.value = (result, exception) 185 186 @staticmethod 187 def _apool_worker(worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]: 188 return run_coroutine_in_new_thread(worker, *args, **kwargs) 189 190 @staticmethod 191 def _apool_worker_wrapper(partial_pool_initializer: Callable, worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]: 192 global process_initialized 193 if 'process_initialized' not in globals(): 194 process_initialized = False 195 196 if not process_initialized: 197 if partial_pool_initializer is not None: 198 partial_pool_initializer() 199 200 process_initialized = True 201 202 return ProcessPool._apool_worker(worker, *args, **kwargs) 203 204 @staticmethod 205 async def _a_single_process_pool_worker(worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]: 206 result = None 207 exception = None 208 try: 209 result = await worker(*args, **kwargs) 210 except: 211 exception = get_exception() 212 213 return result, exception 214 215 @staticmethod 216 async def _a_single_process_pool_worker_wrapper(partial_pool_initializer: Callable, worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]: 217 global process_initialized 218 if 'process_initialized' not in globals(): 219 process_initialized = False 220 221 if not process_initialized: 222 if partial_pool_initializer is not None: 223 partial_pool_initializer() 224 225 process_initialized = True 226 227 return await ProcessPool._a_single_process_pool_worker(worker, *args, **kwargs) 228 229 async def pool_execute(self, worker: Callable, *args, **kwargs) -> Any: 230 if self.process_pool_setup.is_multiprocessing: 231 if ((3, 7) <= sys.version_info) or (self.partial_pool_initializer is None): 232 if is_async(worker): 233 partial_pool_worker = partial(ProcessPool._apool_worker, worker, *args, **kwargs) 234 else: 235 partial_pool_worker = partial(ProcessPool._pool_worker, worker, *args, **kwargs) 236 else: 237 if is_async(worker): 238 partial_pool_worker = partial(ProcessPool._apool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs) 239 else: 240 partial_pool_worker = partial(ProcessPool._pool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs) 241 242 result: Tuple[Any, Optional[BaseException]] = await self.process_pool_setup.loop.run_in_executor(self.executor, partial_pool_worker) 243 else: 244 if is_async(worker): 245 a_single_process_pool_worker = partial(ProcessPool._a_single_process_pool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs) 246 result = await a_single_process_pool_worker() 247 else: 248 partial_pool_worker = partial(ProcessPool._pool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs) 249 result = partial_pool_worker() 250 251 result, exception = result 252 253 if exception is not None: 254 raise exception 255 256 return result 257 258 async def __call__(self, worker: Callable, *args, **kwargs) -> Any: 259 return await self.pool_execute(worker, *args, **kwargs)
ProcessPool( executor_setup: ExecutorSetupBase, initializer_setup: Union[InitializerSetup, NoneType] = None, process_pool_setup: Union[ProcessPoolSetup, NoneType] = None)
111 def __init__(self, executor_setup: ExecutorSetupBase, initializer_setup: Optional[InitializerSetup] = None, process_pool_setup: Optional[ProcessPoolSetup] = None) -> None: 112 self.executor_setup: ExecutorSetupBase = executor_setup 113 self.process_pool_setup: Optional[ProcessPoolSetup] = process_pool_setup or ProcessPoolSetup() 114 self.initializer_setup: Optional[InitializerSetup] = initializer_setup 115 self.executor: Executor = None 116 self.partial_pool_initializer: Callable = None 117 118 self._create_pool()
executor_setup: ExecutorSetupBase
process_pool_setup: Union[ProcessPoolSetup, NoneType]
initializer_setup: Union[InitializerSetup, NoneType]
async def
pool_execute(self, worker: Callable, *args, **kwargs) -> Any:
229 async def pool_execute(self, worker: Callable, *args, **kwargs) -> Any: 230 if self.process_pool_setup.is_multiprocessing: 231 if ((3, 7) <= sys.version_info) or (self.partial_pool_initializer is None): 232 if is_async(worker): 233 partial_pool_worker = partial(ProcessPool._apool_worker, worker, *args, **kwargs) 234 else: 235 partial_pool_worker = partial(ProcessPool._pool_worker, worker, *args, **kwargs) 236 else: 237 if is_async(worker): 238 partial_pool_worker = partial(ProcessPool._apool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs) 239 else: 240 partial_pool_worker = partial(ProcessPool._pool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs) 241 242 result: Tuple[Any, Optional[BaseException]] = await self.process_pool_setup.loop.run_in_executor(self.executor, partial_pool_worker) 243 else: 244 if is_async(worker): 245 a_single_process_pool_worker = partial(ProcessPool._a_single_process_pool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs) 246 result = await a_single_process_pool_worker() 247 else: 248 partial_pool_worker = partial(ProcessPool._pool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs) 249 result = partial_pool_worker() 250 251 result, exception = result 252 253 if exception is not None: 254 raise exception 255 256 return result