cengal.parallel_execution.asyncio.run_in_process_pool.versions.v_0.run_in_process_pool

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18__all__ = ['run_coroutine_in_new_thread', 'ProcessPoolRuntimeError', 'ExecutorSetupBase', 'ExecutorTypeSetup', 'ExecutorInstanceSetup', 'InitializerSetup', 'ProcessPoolSetup', 'ProcessPool']
 19
 20"""
 21Module Docstring
 22Docstrings: http://www.python.org/dev/peps/pep-0257/
 23"""
 24
 25__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 28__license__ = "Apache License, Version 2.0"
 29__version__ = "4.4.1"
 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 31__email__ = "gtalk@butenkoms.space"
 32# __status__ = "Prototype"
 33__status__ = "Development"
 34# __status__ = "Production"
 35
 36
 37from cengal.introspection.inspect import get_exception, is_async
 38from cengal.code_flow_control.smart_values import ResultHolder
 39
 40import sys
 41import asyncio
 42from concurrent.futures import Executor
 43from threading import Thread
 44from functools import partial
 45from typing import Callable, Optional, Type, Any, Tuple
 46
 47
 48class ProcessPoolRuntimeError(RuntimeError):
 49    pass
 50
 51
 52class ExecutorSetupBase:
 53    pass
 54
 55
 56class ExecutorTypeSetup(ExecutorSetupBase):
 57    def __init__(self, executor_type: Type[Executor], *args, **kwargs) -> None:
 58        self.executor_type: Type[Executor] = executor_type
 59        self.args = args
 60        self.kwargs = kwargs
 61        super().__init__()
 62
 63
 64class ExecutorInstanceSetup(ExecutorSetupBase):
 65    def __init__(self, executor: Executor) -> None:
 66        self.executor: Executor = executor
 67        super().__init__()
 68
 69
 70class InitializerSetup:
 71    def __init__(self, initializer: Callable, *args, **kwargs) -> None:
 72        self.initializer: Callable = initializer
 73        self.args = args
 74        self.kwargs = kwargs
 75
 76
 77class ProcessPoolSetup:
 78    def __init__(self, is_multiprocessing: bool = True, loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
 79        self.loop: Optional[asyncio.AbstractEventLoop] = loop or asyncio.get_event_loop()
 80        self.is_multiprocessing: bool = is_multiprocessing
 81
 82
 83def run_coroutine_in_new_thread(coro: Callable, *args, **kwargs) -> Any:
 84    def thread_worker(result_holder: ResultHolder, coro: Callable, *args, **kwargs):
 85        result = None
 86        exception = None
 87        try:
 88            import asyncio
 89            result = asyncio.run(coro(*args, **kwargs))
 90        except:
 91            exception = get_exception()
 92        
 93        result_holder.value = (result, exception)
 94    
 95    result_holder: ResultHolder = ResultHolder()
 96    thread_args = [result_holder, coro,] + list(args)
 97    thread: Thread = Thread(target=thread_worker, args=thread_args, kwargs=kwargs)
 98    thread.start()
 99    thread.join()
100    if result_holder:
101        result, exception = result_holder.value
102    else:
103        result = None
104        exception = RuntimeError('ProcessPool internal error')
105    
106    return result, exception
107
108
109class ProcessPool:
110    def __init__(self, executor_setup: ExecutorSetupBase, initializer_setup: Optional[InitializerSetup] = None, process_pool_setup: Optional[ProcessPoolSetup] = None) -> None:
111        self.executor_setup: ExecutorSetupBase = executor_setup
112        self.process_pool_setup: Optional[ProcessPoolSetup] = process_pool_setup or ProcessPoolSetup()
113        self.initializer_setup: Optional[InitializerSetup] = initializer_setup
114        self.executor: Executor = None
115        self.partial_pool_initializer: Callable = None
116        
117        self._create_pool()
118    
119    def set_is_multiprocessing(self, is_multiprocessing: bool):
120        self.process_pool_setup.is_multiprocessing = is_multiprocessing
121    
122    def _create_pool(self):
123        if self.initializer_setup is not None:
124            self.partial_pool_initializer = partial(ProcessPool._initializer, self.initializer_setup.initializer, *self.initializer_setup.args, **self.initializer_setup.kwargs)
125        
126        if (3, 7) <= sys.version_info:
127            if isinstance(self.executor_setup, ExecutorInstanceSetup):
128                self.executor = self.executor_setup.executor
129            elif isinstance(self.executor_setup, ExecutorTypeSetup):
130                if self.partial_pool_initializer is not None:
131                    self.executor_setup.kwargs['initializer'] = self.partial_pool_initializer
132                
133                self.executor = self.executor_setup.executor_type(*self.executor_setup.args, **self.executor_setup.kwargs)
134            else:
135                raise ProcessPoolRuntimeError('Unknown "executor_setup" parameter type')
136        else:
137            self.executor = self.executor_setup.executor_type(*self.executor_setup.args, **self.executor_setup.kwargs)
138    
139    def shutdown(self, wait=True, cancel_futures=False):
140        self.executor.shutdown(wait, cancel_futures=cancel_futures)
141    
142    @staticmethod
143    def _initializer(initializer, *args, **kwargs):
144        import multiprocessing
145        
146        initializer(multiprocessing.current_process()._identity, *args, **kwargs)
147
148    @staticmethod
149    def _pool_worker(worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]:
150        result = None
151        exception = None
152        try:
153            result = worker(*args, **kwargs)
154        except:
155            exception = get_exception()
156        
157        return result, exception
158
159    @staticmethod
160    def _pool_worker_wrapper(partial_pool_initializer: Callable, worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]:
161        global process_initialized
162        if 'process_initialized' not in globals():
163            process_initialized = False
164        
165        if not process_initialized:
166            if partial_pool_initializer is not None:
167                partial_pool_initializer()
168            
169            process_initialized = True
170        
171        return ProcessPool._pool_worker(worker, *args, **kwargs)
172
173    @staticmethod
174    def _apool_worker(result_holder: ResultHolder, worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]:
175        result = None
176        exception = None
177        try:
178            import asyncio
179            result = asyncio.run(worker(*args, **kwargs))
180        except:
181            exception = get_exception()
182        
183        result_holder.value = (result, exception)
184    
185    @staticmethod
186    def _apool_worker(worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]:
187        return run_coroutine_in_new_thread(worker, *args, **kwargs)
188
189    @staticmethod
190    def _apool_worker_wrapper(partial_pool_initializer: Callable, worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]:
191        global process_initialized
192        if 'process_initialized' not in globals():
193            process_initialized = False
194        
195        if not process_initialized:
196            if partial_pool_initializer is not None:
197                partial_pool_initializer()
198
199            process_initialized = True
200        
201        return ProcessPool._apool_worker(worker, *args, **kwargs)
202
203    @staticmethod
204    async def _a_single_process_pool_worker(worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]:
205        result = None
206        exception = None
207        try:
208            result = await worker(*args, **kwargs)
209        except:
210            exception = get_exception()
211        
212        return result, exception
213
214    @staticmethod
215    async def _a_single_process_pool_worker_wrapper(partial_pool_initializer: Callable, worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]:
216        global process_initialized
217        if 'process_initialized' not in globals():
218            process_initialized = False
219        
220        if not process_initialized:
221            if partial_pool_initializer is not None:
222                partial_pool_initializer()
223
224            process_initialized = True
225        
226        return await ProcessPool._a_single_process_pool_worker(worker, *args, **kwargs)
227
228    async def pool_execute(self, worker: Callable, *args, **kwargs) -> Any:
229        if self.process_pool_setup.is_multiprocessing:
230            if ((3, 7) <= sys.version_info) or (self.partial_pool_initializer is None):
231                if is_async(worker):
232                    partial_pool_worker = partial(ProcessPool._apool_worker, worker, *args, **kwargs)
233                else:
234                    partial_pool_worker = partial(ProcessPool._pool_worker, worker, *args, **kwargs)
235            else:
236                if is_async(worker):
237                    partial_pool_worker = partial(ProcessPool._apool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs)
238                else:
239                    partial_pool_worker = partial(ProcessPool._pool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs)
240            
241            result: Tuple[Any, Optional[BaseException]] = await self.process_pool_setup.loop.run_in_executor(self.executor, partial_pool_worker)
242        else:
243            if is_async(worker):
244                a_single_process_pool_worker = partial(ProcessPool._a_single_process_pool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs)
245                result = await a_single_process_pool_worker()
246            else:
247                partial_pool_worker = partial(ProcessPool._pool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs)
248                result = partial_pool_worker()
249        
250        result, exception = result
251        
252        if exception is not None:
253            raise exception
254        
255        return result
256    
257    async def __call__(self, worker: Callable, *args, **kwargs) -> Any:
258        return await self.pool_execute(worker, *args, **kwargs)
259
260
261# def pool_initializer(text):
262#     import multiprocessing
263    
264#     print(text, multiprocessing.current_process()._identity[0])
265
266
267# def create_pool():
268#     global executor_init_params
269#     executor_init_params = (('hello pool',), dict())
270#     global executor
271#     if (3, 7) <= sys.version_info:
272#         pool_args, pool_kwargs = executor_init_params
273#         partial_pool_initializer = partial(pool_initializer, *pool_args, **pool_kwargs)
274#         executor = ProcessPoolExecutor(max_workers=2, initializer=partial_pool_initializer)
275#     else:
276#         executor = ProcessPoolExecutor(max_workers=2)
277
278
279# def pool_worker_impl(item: int):
280#     return 1000 / item
281
282
283# def pool_worker(*args, **kwargs):
284#     result = None
285#     exception = None
286#     try:
287#         result = pool_worker_impl(*args, **kwargs)
288#     except:
289#         exception = get_exception()
290    
291#     return result, exception
292
293
294# def pool_worker_wrapper(pool_init_params, *args, **kwargs):
295#     global process_initialized
296#     if 'process_initialized' not in globals():
297#         process_initialized = False
298    
299#     if not process_initialized:
300#         pool_args, pool_kwargs = pool_init_params
301#         pool_initializer(*pool_args, **pool_kwargs)
302#         process_initialized = True
303    
304#     return pool_worker(*args, **kwargs)
305        
306
307# async def pool_execute(*args, **kwargs):
308#     if (3, 7) <= sys.version_info:
309#         partial_pool_worker = partial(pool_worker, *args, **kwargs)
310#     else:
311#         partial_pool_worker = partial(pool_worker_wrapper, executor_init_params, *args, **kwargs)
312        
313#     if is_multiprocessing:
314#         result = await loop.run_in_executor(executor, partial_pool_worker)
315#     else:
316#         result = pool_worker(*args, **kwargs)
317    
318#     result, exception = result
319    
320#     if exception is not None:
321#         raise exception
322    
323#     return result
324
325
326# async def pool_single_processing_example(item: int = 2):
327#     return await pool_execute(item)
328
329
330# async def pool_gather_example(num: int = 3):
331#     return await asyncio.gather(*[pool_execute(i) for i in range(num)])
def run_coroutine_in_new_thread(coro: Callable, *args, **kwargs) -> Any:
 84def run_coroutine_in_new_thread(coro: Callable, *args, **kwargs) -> Any:
 85    def thread_worker(result_holder: ResultHolder, coro: Callable, *args, **kwargs):
 86        result = None
 87        exception = None
 88        try:
 89            import asyncio
 90            result = asyncio.run(coro(*args, **kwargs))
 91        except:
 92            exception = get_exception()
 93        
 94        result_holder.value = (result, exception)
 95    
 96    result_holder: ResultHolder = ResultHolder()
 97    thread_args = [result_holder, coro,] + list(args)
 98    thread: Thread = Thread(target=thread_worker, args=thread_args, kwargs=kwargs)
 99    thread.start()
100    thread.join()
101    if result_holder:
102        result, exception = result_holder.value
103    else:
104        result = None
105        exception = RuntimeError('ProcessPool internal error')
106    
107    return result, exception
class ProcessPoolRuntimeError(builtins.RuntimeError):
49class ProcessPoolRuntimeError(RuntimeError):
50    pass

Unspecified run-time error.

Inherited Members
builtins.RuntimeError
RuntimeError
builtins.BaseException
with_traceback
args
class ExecutorSetupBase:
53class ExecutorSetupBase:
54    pass
class ExecutorTypeSetup(ExecutorSetupBase):
57class ExecutorTypeSetup(ExecutorSetupBase):
58    def __init__(self, executor_type: Type[Executor], *args, **kwargs) -> None:
59        self.executor_type: Type[Executor] = executor_type
60        self.args = args
61        self.kwargs = kwargs
62        super().__init__()
ExecutorTypeSetup( executor_type: Type[concurrent.futures._base.Executor], *args, **kwargs)
58    def __init__(self, executor_type: Type[Executor], *args, **kwargs) -> None:
59        self.executor_type: Type[Executor] = executor_type
60        self.args = args
61        self.kwargs = kwargs
62        super().__init__()
executor_type: Type[concurrent.futures._base.Executor]
args
kwargs
class ExecutorInstanceSetup(ExecutorSetupBase):
65class ExecutorInstanceSetup(ExecutorSetupBase):
66    def __init__(self, executor: Executor) -> None:
67        self.executor: Executor = executor
68        super().__init__()
ExecutorInstanceSetup(executor: concurrent.futures._base.Executor)
66    def __init__(self, executor: Executor) -> None:
67        self.executor: Executor = executor
68        super().__init__()
executor: concurrent.futures._base.Executor
class InitializerSetup:
71class InitializerSetup:
72    def __init__(self, initializer: Callable, *args, **kwargs) -> None:
73        self.initializer: Callable = initializer
74        self.args = args
75        self.kwargs = kwargs
InitializerSetup(initializer: Callable, *args, **kwargs)
72    def __init__(self, initializer: Callable, *args, **kwargs) -> None:
73        self.initializer: Callable = initializer
74        self.args = args
75        self.kwargs = kwargs
initializer: Callable
args
kwargs
class ProcessPoolSetup:
78class ProcessPoolSetup:
79    def __init__(self, is_multiprocessing: bool = True, loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
80        self.loop: Optional[asyncio.AbstractEventLoop] = loop or asyncio.get_event_loop()
81        self.is_multiprocessing: bool = is_multiprocessing
ProcessPoolSetup( is_multiprocessing: bool = True, loop: Union[asyncio.events.AbstractEventLoop, NoneType] = None)
79    def __init__(self, is_multiprocessing: bool = True, loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
80        self.loop: Optional[asyncio.AbstractEventLoop] = loop or asyncio.get_event_loop()
81        self.is_multiprocessing: bool = is_multiprocessing
loop: Union[asyncio.events.AbstractEventLoop, NoneType]
is_multiprocessing: bool
class ProcessPool:
110class ProcessPool:
111    def __init__(self, executor_setup: ExecutorSetupBase, initializer_setup: Optional[InitializerSetup] = None, process_pool_setup: Optional[ProcessPoolSetup] = None) -> None:
112        self.executor_setup: ExecutorSetupBase = executor_setup
113        self.process_pool_setup: Optional[ProcessPoolSetup] = process_pool_setup or ProcessPoolSetup()
114        self.initializer_setup: Optional[InitializerSetup] = initializer_setup
115        self.executor: Executor = None
116        self.partial_pool_initializer: Callable = None
117        
118        self._create_pool()
119    
120    def set_is_multiprocessing(self, is_multiprocessing: bool):
121        self.process_pool_setup.is_multiprocessing = is_multiprocessing
122    
123    def _create_pool(self):
124        if self.initializer_setup is not None:
125            self.partial_pool_initializer = partial(ProcessPool._initializer, self.initializer_setup.initializer, *self.initializer_setup.args, **self.initializer_setup.kwargs)
126        
127        if (3, 7) <= sys.version_info:
128            if isinstance(self.executor_setup, ExecutorInstanceSetup):
129                self.executor = self.executor_setup.executor
130            elif isinstance(self.executor_setup, ExecutorTypeSetup):
131                if self.partial_pool_initializer is not None:
132                    self.executor_setup.kwargs['initializer'] = self.partial_pool_initializer
133                
134                self.executor = self.executor_setup.executor_type(*self.executor_setup.args, **self.executor_setup.kwargs)
135            else:
136                raise ProcessPoolRuntimeError('Unknown "executor_setup" parameter type')
137        else:
138            self.executor = self.executor_setup.executor_type(*self.executor_setup.args, **self.executor_setup.kwargs)
139    
140    def shutdown(self, wait=True, cancel_futures=False):
141        self.executor.shutdown(wait, cancel_futures=cancel_futures)
142    
143    @staticmethod
144    def _initializer(initializer, *args, **kwargs):
145        import multiprocessing
146        
147        initializer(multiprocessing.current_process()._identity, *args, **kwargs)
148
149    @staticmethod
150    def _pool_worker(worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]:
151        result = None
152        exception = None
153        try:
154            result = worker(*args, **kwargs)
155        except:
156            exception = get_exception()
157        
158        return result, exception
159
160    @staticmethod
161    def _pool_worker_wrapper(partial_pool_initializer: Callable, worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]:
162        global process_initialized
163        if 'process_initialized' not in globals():
164            process_initialized = False
165        
166        if not process_initialized:
167            if partial_pool_initializer is not None:
168                partial_pool_initializer()
169            
170            process_initialized = True
171        
172        return ProcessPool._pool_worker(worker, *args, **kwargs)
173
174    @staticmethod
175    def _apool_worker(result_holder: ResultHolder, worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]:
176        result = None
177        exception = None
178        try:
179            import asyncio
180            result = asyncio.run(worker(*args, **kwargs))
181        except:
182            exception = get_exception()
183        
184        result_holder.value = (result, exception)
185    
186    @staticmethod
187    def _apool_worker(worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]:
188        return run_coroutine_in_new_thread(worker, *args, **kwargs)
189
190    @staticmethod
191    def _apool_worker_wrapper(partial_pool_initializer: Callable, worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]:
192        global process_initialized
193        if 'process_initialized' not in globals():
194            process_initialized = False
195        
196        if not process_initialized:
197            if partial_pool_initializer is not None:
198                partial_pool_initializer()
199
200            process_initialized = True
201        
202        return ProcessPool._apool_worker(worker, *args, **kwargs)
203
204    @staticmethod
205    async def _a_single_process_pool_worker(worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]:
206        result = None
207        exception = None
208        try:
209            result = await worker(*args, **kwargs)
210        except:
211            exception = get_exception()
212        
213        return result, exception
214
215    @staticmethod
216    async def _a_single_process_pool_worker_wrapper(partial_pool_initializer: Callable, worker: Callable, *args, **kwargs) -> Tuple[Any, Optional[BaseException]]:
217        global process_initialized
218        if 'process_initialized' not in globals():
219            process_initialized = False
220        
221        if not process_initialized:
222            if partial_pool_initializer is not None:
223                partial_pool_initializer()
224
225            process_initialized = True
226        
227        return await ProcessPool._a_single_process_pool_worker(worker, *args, **kwargs)
228
229    async def pool_execute(self, worker: Callable, *args, **kwargs) -> Any:
230        if self.process_pool_setup.is_multiprocessing:
231            if ((3, 7) <= sys.version_info) or (self.partial_pool_initializer is None):
232                if is_async(worker):
233                    partial_pool_worker = partial(ProcessPool._apool_worker, worker, *args, **kwargs)
234                else:
235                    partial_pool_worker = partial(ProcessPool._pool_worker, worker, *args, **kwargs)
236            else:
237                if is_async(worker):
238                    partial_pool_worker = partial(ProcessPool._apool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs)
239                else:
240                    partial_pool_worker = partial(ProcessPool._pool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs)
241            
242            result: Tuple[Any, Optional[BaseException]] = await self.process_pool_setup.loop.run_in_executor(self.executor, partial_pool_worker)
243        else:
244            if is_async(worker):
245                a_single_process_pool_worker = partial(ProcessPool._a_single_process_pool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs)
246                result = await a_single_process_pool_worker()
247            else:
248                partial_pool_worker = partial(ProcessPool._pool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs)
249                result = partial_pool_worker()
250        
251        result, exception = result
252        
253        if exception is not None:
254            raise exception
255        
256        return result
257    
258    async def __call__(self, worker: Callable, *args, **kwargs) -> Any:
259        return await self.pool_execute(worker, *args, **kwargs)
ProcessPool( executor_setup: ExecutorSetupBase, initializer_setup: Union[InitializerSetup, NoneType] = None, process_pool_setup: Union[ProcessPoolSetup, NoneType] = None)
111    def __init__(self, executor_setup: ExecutorSetupBase, initializer_setup: Optional[InitializerSetup] = None, process_pool_setup: Optional[ProcessPoolSetup] = None) -> None:
112        self.executor_setup: ExecutorSetupBase = executor_setup
113        self.process_pool_setup: Optional[ProcessPoolSetup] = process_pool_setup or ProcessPoolSetup()
114        self.initializer_setup: Optional[InitializerSetup] = initializer_setup
115        self.executor: Executor = None
116        self.partial_pool_initializer: Callable = None
117        
118        self._create_pool()
executor_setup: ExecutorSetupBase
process_pool_setup: Union[ProcessPoolSetup, NoneType]
initializer_setup: Union[InitializerSetup, NoneType]
executor: concurrent.futures._base.Executor
partial_pool_initializer: Callable
def set_is_multiprocessing(self, is_multiprocessing: bool):
120    def set_is_multiprocessing(self, is_multiprocessing: bool):
121        self.process_pool_setup.is_multiprocessing = is_multiprocessing
def shutdown(self, wait=True, cancel_futures=False):
140    def shutdown(self, wait=True, cancel_futures=False):
141        self.executor.shutdown(wait, cancel_futures=cancel_futures)
async def pool_execute(self, worker: Callable, *args, **kwargs) -> Any:
229    async def pool_execute(self, worker: Callable, *args, **kwargs) -> Any:
230        if self.process_pool_setup.is_multiprocessing:
231            if ((3, 7) <= sys.version_info) or (self.partial_pool_initializer is None):
232                if is_async(worker):
233                    partial_pool_worker = partial(ProcessPool._apool_worker, worker, *args, **kwargs)
234                else:
235                    partial_pool_worker = partial(ProcessPool._pool_worker, worker, *args, **kwargs)
236            else:
237                if is_async(worker):
238                    partial_pool_worker = partial(ProcessPool._apool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs)
239                else:
240                    partial_pool_worker = partial(ProcessPool._pool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs)
241            
242            result: Tuple[Any, Optional[BaseException]] = await self.process_pool_setup.loop.run_in_executor(self.executor, partial_pool_worker)
243        else:
244            if is_async(worker):
245                a_single_process_pool_worker = partial(ProcessPool._a_single_process_pool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs)
246                result = await a_single_process_pool_worker()
247            else:
248                partial_pool_worker = partial(ProcessPool._pool_worker_wrapper, self.partial_pool_initializer, worker, *args, **kwargs)
249                result = partial_pool_worker()
250        
251        result, exception = result
252        
253        if exception is not None:
254            raise exception
255        
256        return result