cengal.parallel_execution.multithreading.thread_workers_pool
Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19""" 20Module Docstring 21Docstrings: http://www.python.org/dev/peps/pep-0257/ 22""" 23 24 25__author__ = "ButenkoMS <gtalk@butenkoms.space>" 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 28__license__ = "Apache License, Version 2.0" 29__version__ = "4.4.1" 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 31__email__ = "gtalk@butenkoms.space" 32# __status__ = "Prototype" 33__status__ = "Development" 34# __status__ = "Production" 35 36 37from cengal.introspection.inspect import get_exception 38from typing import Callable, Any, Optional, Tuple, List, Dict, Type 39from enum import Enum 40import queue 41import threading 42import sys 43 44 45ExceptionInfo = Tuple 46 47 48class RequestToThread: 49 class Command(Enum): 50 request = 0 51 shut_down = 1 52 53 def __init__(self, command_type: 'Command', request: Any): 54 self.command_type = command_type 55 self.request = request 56 57 58class ResponseFromThread: 59 def __init__(self, has_response: bool, response: Any, exception: Optional[ExceptionInfo] = None): 60 self.has_response = has_response # type: bool 61 self.response = response # type: Any 62 self.exception = exception # type: Optional[ExceptionInfo] 63 64 65ThreadWorker = Callable[[RequestToThread], ResponseFromThread] 66TypeOfThreadWorker = Type[ThreadWorker] 67 68 69class ServiceThread(threading.Thread): 70 def __init__(self, worker_type: TypeOfThreadWorker): 71 super(ServiceThread, self).__init__(daemon=True) 72 self.requests = queue.Queue() # type: queue.Queue 73 self.results = queue.Queue() # type: queue.Queue 74 self.worker = worker_type() # type: ThreadWorker 75 76 def run(self): 77 shut_down = False 78 while not shut_down: 79 request: RequestToThread = self.requests.get() 80 81 if RequestToThread.Command.shut_down == request.command_type: 82 shut_down = True 83 84 try: 85 response = self.worker(request) 86 except: 87 response = ResponseFromThread(False, None, get_exception()) 88 89 self.results.put(response) 90 self.requests.task_done() 91 92 def put(self, request: RequestToThread): 93 self.requests.put(request) 94 95 def put_nowait(self, request: RequestToThread): 96 self.requests.put_nowait(request) 97 98 def get(self) -> Any: 99 response: ResponseFromThread = self.results.get() 100 self.results.task_done() 101 if response.has_response: 102 return response.response 103 elif response.exception is not None: 104 raise response.exception[1] 105 else: 106 raise RuntimeError 107 108 def get_nowait(self) -> Any: 109 response: ResponseFromThread = self.results.get_nowait() 110 self.results.task_done() 111 if response.has_response: 112 return response.response 113 elif response.exception is not None: 114 raise response.exception[1] 115 else: 116 raise RuntimeError 117 118 119class ServiceThreadPool: 120 def __init__(self, worker_type: TypeOfThreadWorker, number_of_threads: int = 1): 121 self.worker_type = worker_type 122 self.number_of_threads = number_of_threads 123 if self.number_of_threads < 1: 124 self.number_of_threads = 1 125 self.threads = list() # type: List[ServiceThread] 126 self.thread_pending_results = dict() # type: Dict[ServiceThread, int] 127 self.pending_requests_queue = list() # type: List[RequestToThread] 128 self._init() 129 130 def put_synchronous(self, request: RequestToThread): 131 thread: ServiceThread = self._get_best_thread() 132 thread.put(request) 133 self.thread_pending_results[thread] += 1 134 135 def put_into_pending_queue(self, request: RequestToThread): 136 self.pending_requests_queue.append(request) 137 138 def put_pending_queue_into_work(self): 139 buff_pending_requests_queue = self.pending_requests_queue 140 self.pending_requests_queue = type(self.pending_requests_queue)() 141 for pending_request in buff_pending_requests_queue: 142 if not self._put_impl(pending_request): 143 self.pending_requests_queue.append(pending_request) 144 145 def get_results(self) -> List[Any]: 146 responses: List[Any] = list() 147 for thread, pending_responses in self.thread_pending_results.items(): 148 if pending_responses: 149 try: 150 while True: 151 responses.append(thread.get_nowait()) 152 self.thread_pending_results[thread] -= 1 153 except queue.Empty: 154 pass 155 return responses 156 157 def stop(self) -> List[Any]: 158 for thread in self.threads: 159 thread.put(RequestToThread(RequestToThread.Command.shut_down, None)) 160 for thread in self.threads: 161 thread.join() 162 return self.get_results() 163 164 def _init(self): 165 for index in range(self.number_of_threads): 166 self.threads.append(ServiceThread(self.worker_type)) 167 for thread in self.threads: 168 self.thread_pending_results[thread] = 0 169 thread.start() 170 171 def _get_best_thread(self) -> ServiceThread: 172 thread_load: Dict = dict() 173 thread_index = -1 174 for thread in self.threads: 175 thread: ServiceThread = thread 176 thread_index += 1 177 thread_load[thread_index] = thread.requests.unfinished_tasks 178 sorted_by_value = sorted(thread_load.items(), key=lambda kv: kv[1]) 179 return self.threads[sorted_by_value[0][0]] 180 181 def _get_threads_list(self) -> List[Tuple[int, ServiceThread]]: 182 thread_load: Dict = dict() 183 thread_index = -1 184 for thread in self.threads: 185 thread: ServiceThread = thread 186 thread_index += 1 187 thread_load[thread_index] = thread.requests.unfinished_tasks 188 sorted_by_value = sorted(thread_load.items(), key=lambda kv: kv[1]) 189 return sorted_by_value 190 191 def _put_impl(self, request: RequestToThread) -> bool: 192 is_ok = False 193 for thread_index, thread_qsize in self._get_threads_list(): 194 try: 195 thread = self.threads[thread_index] 196 thread.put_nowait(request) 197 self.thread_pending_results[thread] += 1 198 is_ok = True 199 break 200 except queue.Full: 201 pass 202 return is_ok
49class RequestToThread: 50 class Command(Enum): 51 request = 0 52 shut_down = 1 53 54 def __init__(self, command_type: 'Command', request: Any): 55 self.command_type = command_type 56 self.request = request
An enumeration.
Inherited Members
- enum.Enum
- name
- value
59class ResponseFromThread: 60 def __init__(self, has_response: bool, response: Any, exception: Optional[ExceptionInfo] = None): 61 self.has_response = has_response # type: bool 62 self.response = response # type: Any 63 self.exception = exception # type: Optional[ExceptionInfo]
70class ServiceThread(threading.Thread): 71 def __init__(self, worker_type: TypeOfThreadWorker): 72 super(ServiceThread, self).__init__(daemon=True) 73 self.requests = queue.Queue() # type: queue.Queue 74 self.results = queue.Queue() # type: queue.Queue 75 self.worker = worker_type() # type: ThreadWorker 76 77 def run(self): 78 shut_down = False 79 while not shut_down: 80 request: RequestToThread = self.requests.get() 81 82 if RequestToThread.Command.shut_down == request.command_type: 83 shut_down = True 84 85 try: 86 response = self.worker(request) 87 except: 88 response = ResponseFromThread(False, None, get_exception()) 89 90 self.results.put(response) 91 self.requests.task_done() 92 93 def put(self, request: RequestToThread): 94 self.requests.put(request) 95 96 def put_nowait(self, request: RequestToThread): 97 self.requests.put_nowait(request) 98 99 def get(self) -> Any: 100 response: ResponseFromThread = self.results.get() 101 self.results.task_done() 102 if response.has_response: 103 return response.response 104 elif response.exception is not None: 105 raise response.exception[1] 106 else: 107 raise RuntimeError 108 109 def get_nowait(self) -> Any: 110 response: ResponseFromThread = self.results.get_nowait() 111 self.results.task_done() 112 if response.has_response: 113 return response.response 114 elif response.exception is not None: 115 raise response.exception[1] 116 else: 117 raise RuntimeError
A class that represents a thread of control.
This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.
71 def __init__(self, worker_type: TypeOfThreadWorker): 72 super(ServiceThread, self).__init__(daemon=True) 73 self.requests = queue.Queue() # type: queue.Queue 74 self.results = queue.Queue() # type: queue.Queue 75 self.worker = worker_type() # type: ThreadWorker
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
77 def run(self): 78 shut_down = False 79 while not shut_down: 80 request: RequestToThread = self.requests.get() 81 82 if RequestToThread.Command.shut_down == request.command_type: 83 shut_down = True 84 85 try: 86 response = self.worker(request) 87 except: 88 response = ResponseFromThread(False, None, get_exception()) 89 90 self.results.put(response) 91 self.requests.task_done()
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
Inherited Members
- threading.Thread
- start
- join
- name
- ident
- is_alive
- isAlive
- daemon
- isDaemon
- setDaemon
- getName
- setName
- native_id
120class ServiceThreadPool: 121 def __init__(self, worker_type: TypeOfThreadWorker, number_of_threads: int = 1): 122 self.worker_type = worker_type 123 self.number_of_threads = number_of_threads 124 if self.number_of_threads < 1: 125 self.number_of_threads = 1 126 self.threads = list() # type: List[ServiceThread] 127 self.thread_pending_results = dict() # type: Dict[ServiceThread, int] 128 self.pending_requests_queue = list() # type: List[RequestToThread] 129 self._init() 130 131 def put_synchronous(self, request: RequestToThread): 132 thread: ServiceThread = self._get_best_thread() 133 thread.put(request) 134 self.thread_pending_results[thread] += 1 135 136 def put_into_pending_queue(self, request: RequestToThread): 137 self.pending_requests_queue.append(request) 138 139 def put_pending_queue_into_work(self): 140 buff_pending_requests_queue = self.pending_requests_queue 141 self.pending_requests_queue = type(self.pending_requests_queue)() 142 for pending_request in buff_pending_requests_queue: 143 if not self._put_impl(pending_request): 144 self.pending_requests_queue.append(pending_request) 145 146 def get_results(self) -> List[Any]: 147 responses: List[Any] = list() 148 for thread, pending_responses in self.thread_pending_results.items(): 149 if pending_responses: 150 try: 151 while True: 152 responses.append(thread.get_nowait()) 153 self.thread_pending_results[thread] -= 1 154 except queue.Empty: 155 pass 156 return responses 157 158 def stop(self) -> List[Any]: 159 for thread in self.threads: 160 thread.put(RequestToThread(RequestToThread.Command.shut_down, None)) 161 for thread in self.threads: 162 thread.join() 163 return self.get_results() 164 165 def _init(self): 166 for index in range(self.number_of_threads): 167 self.threads.append(ServiceThread(self.worker_type)) 168 for thread in self.threads: 169 self.thread_pending_results[thread] = 0 170 thread.start() 171 172 def _get_best_thread(self) -> ServiceThread: 173 thread_load: Dict = dict() 174 thread_index = -1 175 for thread in self.threads: 176 thread: ServiceThread = thread 177 thread_index += 1 178 thread_load[thread_index] = thread.requests.unfinished_tasks 179 sorted_by_value = sorted(thread_load.items(), key=lambda kv: kv[1]) 180 return self.threads[sorted_by_value[0][0]] 181 182 def _get_threads_list(self) -> List[Tuple[int, ServiceThread]]: 183 thread_load: Dict = dict() 184 thread_index = -1 185 for thread in self.threads: 186 thread: ServiceThread = thread 187 thread_index += 1 188 thread_load[thread_index] = thread.requests.unfinished_tasks 189 sorted_by_value = sorted(thread_load.items(), key=lambda kv: kv[1]) 190 return sorted_by_value 191 192 def _put_impl(self, request: RequestToThread) -> bool: 193 is_ok = False 194 for thread_index, thread_qsize in self._get_threads_list(): 195 try: 196 thread = self.threads[thread_index] 197 thread.put_nowait(request) 198 self.thread_pending_results[thread] += 1 199 is_ok = True 200 break 201 except queue.Full: 202 pass 203 return is_ok
121 def __init__(self, worker_type: TypeOfThreadWorker, number_of_threads: int = 1): 122 self.worker_type = worker_type 123 self.number_of_threads = number_of_threads 124 if self.number_of_threads < 1: 125 self.number_of_threads = 1 126 self.threads = list() # type: List[ServiceThread] 127 self.thread_pending_results = dict() # type: Dict[ServiceThread, int] 128 self.pending_requests_queue = list() # type: List[RequestToThread] 129 self._init()
139 def put_pending_queue_into_work(self): 140 buff_pending_requests_queue = self.pending_requests_queue 141 self.pending_requests_queue = type(self.pending_requests_queue)() 142 for pending_request in buff_pending_requests_queue: 143 if not self._put_impl(pending_request): 144 self.pending_requests_queue.append(pending_request)
146 def get_results(self) -> List[Any]: 147 responses: List[Any] = list() 148 for thread, pending_responses in self.thread_pending_results.items(): 149 if pending_responses: 150 try: 151 while True: 152 responses.append(thread.get_nowait()) 153 self.thread_pending_results[thread] -= 1 154 except queue.Empty: 155 pass 156 return responses