cengal.parallel_execution.multithreading.thread_workers_pool

Module Docstring Docstrings: http://www.python.org/dev/peps/pep-0257/

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19"""
 20Module Docstring
 21Docstrings: http://www.python.org/dev/peps/pep-0257/
 22"""
 23
 24
 25__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 26__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 27__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 28__license__ = "Apache License, Version 2.0"
 29__version__ = "4.4.1"
 30__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 31__email__ = "gtalk@butenkoms.space"
 32# __status__ = "Prototype"
 33__status__ = "Development"
 34# __status__ = "Production"
 35
 36
 37from cengal.introspection.inspect import get_exception
 38from typing import Callable, Any, Optional, Tuple, List, Dict, Type
 39from enum import Enum
 40import queue
 41import threading
 42import sys
 43
 44
 45ExceptionInfo = Tuple
 46
 47
 48class RequestToThread:
 49    class Command(Enum):
 50        request = 0
 51        shut_down = 1
 52
 53    def __init__(self, command_type: 'Command', request: Any):
 54        self.command_type = command_type
 55        self.request = request
 56
 57
 58class ResponseFromThread:
 59    def __init__(self, has_response: bool, response: Any, exception: Optional[ExceptionInfo] = None):
 60        self.has_response = has_response  # type: bool
 61        self.response = response          # type: Any
 62        self.exception = exception        # type: Optional[ExceptionInfo]
 63
 64
 65ThreadWorker = Callable[[RequestToThread], ResponseFromThread]
 66TypeOfThreadWorker = Type[ThreadWorker]
 67
 68
 69class ServiceThread(threading.Thread):
 70    def __init__(self, worker_type: TypeOfThreadWorker):
 71        super(ServiceThread, self).__init__(daemon=True)
 72        self.requests = queue.Queue()  # type: queue.Queue
 73        self.results = queue.Queue()   # type: queue.Queue
 74        self.worker = worker_type()    # type: ThreadWorker
 75
 76    def run(self):
 77        shut_down = False
 78        while not shut_down:
 79            request: RequestToThread = self.requests.get()
 80
 81            if RequestToThread.Command.shut_down == request.command_type:
 82                shut_down = True
 83
 84            try:
 85                response = self.worker(request)
 86            except:
 87                response = ResponseFromThread(False, None, get_exception())
 88
 89            self.results.put(response)
 90            self.requests.task_done()
 91
 92    def put(self, request: RequestToThread):
 93        self.requests.put(request)
 94
 95    def put_nowait(self, request: RequestToThread):
 96        self.requests.put_nowait(request)
 97
 98    def get(self) -> Any:
 99        response: ResponseFromThread = self.results.get()
100        self.results.task_done()
101        if response.has_response:
102            return response.response
103        elif response.exception is not None:
104            raise response.exception[1]
105        else:
106            raise RuntimeError
107
108    def get_nowait(self) -> Any:
109        response: ResponseFromThread = self.results.get_nowait()
110        self.results.task_done()
111        if response.has_response:
112            return response.response
113        elif response.exception is not None:
114            raise response.exception[1]
115        else:
116            raise RuntimeError
117
118
119class ServiceThreadPool:
120    def __init__(self, worker_type: TypeOfThreadWorker, number_of_threads: int = 1):
121        self.worker_type = worker_type
122        self.number_of_threads = number_of_threads
123        if self.number_of_threads < 1:
124            self.number_of_threads = 1
125        self.threads = list()                 # type: List[ServiceThread]
126        self.thread_pending_results = dict()  # type: Dict[ServiceThread, int]
127        self.pending_requests_queue = list()  # type: List[RequestToThread]
128        self._init()
129
130    def put_synchronous(self, request: RequestToThread):
131        thread: ServiceThread = self._get_best_thread()
132        thread.put(request)
133        self.thread_pending_results[thread] += 1
134
135    def put_into_pending_queue(self, request: RequestToThread):
136        self.pending_requests_queue.append(request)
137
138    def put_pending_queue_into_work(self):
139        buff_pending_requests_queue = self.pending_requests_queue
140        self.pending_requests_queue = type(self.pending_requests_queue)()
141        for pending_request in buff_pending_requests_queue:
142            if not self._put_impl(pending_request):
143                self.pending_requests_queue.append(pending_request)
144
145    def get_results(self) -> List[Any]:
146        responses: List[Any] = list()
147        for thread, pending_responses in self.thread_pending_results.items():
148            if pending_responses:
149                try:
150                    while True:
151                        responses.append(thread.get_nowait())
152                        self.thread_pending_results[thread] -= 1
153                except queue.Empty:
154                    pass
155        return responses
156
157    def stop(self) -> List[Any]:
158        for thread in self.threads:
159            thread.put(RequestToThread(RequestToThread.Command.shut_down, None))
160        for thread in self.threads:
161            thread.join()
162        return self.get_results()
163
164    def _init(self):
165        for index in range(self.number_of_threads):
166            self.threads.append(ServiceThread(self.worker_type))
167        for thread in self.threads:
168            self.thread_pending_results[thread] = 0
169            thread.start()
170
171    def _get_best_thread(self) -> ServiceThread:
172        thread_load: Dict = dict()
173        thread_index = -1
174        for thread in self.threads:
175            thread: ServiceThread = thread
176            thread_index += 1
177            thread_load[thread_index] = thread.requests.unfinished_tasks
178        sorted_by_value = sorted(thread_load.items(), key=lambda kv: kv[1])
179        return self.threads[sorted_by_value[0][0]]
180
181    def _get_threads_list(self) -> List[Tuple[int, ServiceThread]]:
182        thread_load: Dict = dict()
183        thread_index = -1
184        for thread in self.threads:
185            thread: ServiceThread = thread
186            thread_index += 1
187            thread_load[thread_index] = thread.requests.unfinished_tasks
188        sorted_by_value = sorted(thread_load.items(), key=lambda kv: kv[1])
189        return sorted_by_value
190
191    def _put_impl(self, request: RequestToThread) -> bool:
192        is_ok = False
193        for thread_index, thread_qsize in self._get_threads_list():
194            try:
195                thread = self.threads[thread_index]
196                thread.put_nowait(request)
197                self.thread_pending_results[thread] += 1
198                is_ok = True
199                break
200            except queue.Full:
201                pass
202        return is_ok
ExceptionInfo = typing.Tuple
class RequestToThread:
49class RequestToThread:
50    class Command(Enum):
51        request = 0
52        shut_down = 1
53
54    def __init__(self, command_type: 'Command', request: Any):
55        self.command_type = command_type
56        self.request = request
RequestToThread( command_type: RequestToThread.Command, request: typing.Any)
54    def __init__(self, command_type: 'Command', request: Any):
55        self.command_type = command_type
56        self.request = request
command_type
request
class RequestToThread.Command(enum.Enum):
50    class Command(Enum):
51        request = 0
52        shut_down = 1

An enumeration.

request = <Command.request: 0>
shut_down = <Command.shut_down: 1>
Inherited Members
enum.Enum
name
value
class ResponseFromThread:
59class ResponseFromThread:
60    def __init__(self, has_response: bool, response: Any, exception: Optional[ExceptionInfo] = None):
61        self.has_response = has_response  # type: bool
62        self.response = response          # type: Any
63        self.exception = exception        # type: Optional[ExceptionInfo]
ResponseFromThread( has_response: bool, response: typing.Any, exception: typing.Union[typing.Tuple, NoneType] = None)
60    def __init__(self, has_response: bool, response: Any, exception: Optional[ExceptionInfo] = None):
61        self.has_response = has_response  # type: bool
62        self.response = response          # type: Any
63        self.exception = exception        # type: Optional[ExceptionInfo]
has_response
response
exception
ThreadWorker = typing.Callable[[RequestToThread], ResponseFromThread]
TypeOfThreadWorker = typing.Type[typing.Callable[[RequestToThread], ResponseFromThread]]
class ServiceThread(threading.Thread):
 70class ServiceThread(threading.Thread):
 71    def __init__(self, worker_type: TypeOfThreadWorker):
 72        super(ServiceThread, self).__init__(daemon=True)
 73        self.requests = queue.Queue()  # type: queue.Queue
 74        self.results = queue.Queue()   # type: queue.Queue
 75        self.worker = worker_type()    # type: ThreadWorker
 76
 77    def run(self):
 78        shut_down = False
 79        while not shut_down:
 80            request: RequestToThread = self.requests.get()
 81
 82            if RequestToThread.Command.shut_down == request.command_type:
 83                shut_down = True
 84
 85            try:
 86                response = self.worker(request)
 87            except:
 88                response = ResponseFromThread(False, None, get_exception())
 89
 90            self.results.put(response)
 91            self.requests.task_done()
 92
 93    def put(self, request: RequestToThread):
 94        self.requests.put(request)
 95
 96    def put_nowait(self, request: RequestToThread):
 97        self.requests.put_nowait(request)
 98
 99    def get(self) -> Any:
100        response: ResponseFromThread = self.results.get()
101        self.results.task_done()
102        if response.has_response:
103            return response.response
104        elif response.exception is not None:
105            raise response.exception[1]
106        else:
107            raise RuntimeError
108
109    def get_nowait(self) -> Any:
110        response: ResponseFromThread = self.results.get_nowait()
111        self.results.task_done()
112        if response.has_response:
113            return response.response
114        elif response.exception is not None:
115            raise response.exception[1]
116        else:
117            raise RuntimeError

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

ServiceThread( worker_type: typing.Type[typing.Callable[[RequestToThread], ResponseFromThread]])
71    def __init__(self, worker_type: TypeOfThreadWorker):
72        super(ServiceThread, self).__init__(daemon=True)
73        self.requests = queue.Queue()  # type: queue.Queue
74        self.results = queue.Queue()   # type: queue.Queue
75        self.worker = worker_type()    # type: ThreadWorker

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

requests
results
worker
def run(self):
77    def run(self):
78        shut_down = False
79        while not shut_down:
80            request: RequestToThread = self.requests.get()
81
82            if RequestToThread.Command.shut_down == request.command_type:
83                shut_down = True
84
85            try:
86                response = self.worker(request)
87            except:
88                response = ResponseFromThread(False, None, get_exception())
89
90            self.results.put(response)
91            self.requests.task_done()

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

def put( self, request: RequestToThread):
93    def put(self, request: RequestToThread):
94        self.requests.put(request)
def put_nowait( self, request: RequestToThread):
96    def put_nowait(self, request: RequestToThread):
97        self.requests.put_nowait(request)
def get(self) -> Any:
 99    def get(self) -> Any:
100        response: ResponseFromThread = self.results.get()
101        self.results.task_done()
102        if response.has_response:
103            return response.response
104        elif response.exception is not None:
105            raise response.exception[1]
106        else:
107            raise RuntimeError
def get_nowait(self) -> Any:
109    def get_nowait(self) -> Any:
110        response: ResponseFromThread = self.results.get_nowait()
111        self.results.task_done()
112        if response.has_response:
113            return response.response
114        elif response.exception is not None:
115            raise response.exception[1]
116        else:
117            raise RuntimeError
Inherited Members
threading.Thread
start
join
name
ident
is_alive
isAlive
daemon
isDaemon
setDaemon
getName
setName
native_id
class ServiceThreadPool:
120class ServiceThreadPool:
121    def __init__(self, worker_type: TypeOfThreadWorker, number_of_threads: int = 1):
122        self.worker_type = worker_type
123        self.number_of_threads = number_of_threads
124        if self.number_of_threads < 1:
125            self.number_of_threads = 1
126        self.threads = list()                 # type: List[ServiceThread]
127        self.thread_pending_results = dict()  # type: Dict[ServiceThread, int]
128        self.pending_requests_queue = list()  # type: List[RequestToThread]
129        self._init()
130
131    def put_synchronous(self, request: RequestToThread):
132        thread: ServiceThread = self._get_best_thread()
133        thread.put(request)
134        self.thread_pending_results[thread] += 1
135
136    def put_into_pending_queue(self, request: RequestToThread):
137        self.pending_requests_queue.append(request)
138
139    def put_pending_queue_into_work(self):
140        buff_pending_requests_queue = self.pending_requests_queue
141        self.pending_requests_queue = type(self.pending_requests_queue)()
142        for pending_request in buff_pending_requests_queue:
143            if not self._put_impl(pending_request):
144                self.pending_requests_queue.append(pending_request)
145
146    def get_results(self) -> List[Any]:
147        responses: List[Any] = list()
148        for thread, pending_responses in self.thread_pending_results.items():
149            if pending_responses:
150                try:
151                    while True:
152                        responses.append(thread.get_nowait())
153                        self.thread_pending_results[thread] -= 1
154                except queue.Empty:
155                    pass
156        return responses
157
158    def stop(self) -> List[Any]:
159        for thread in self.threads:
160            thread.put(RequestToThread(RequestToThread.Command.shut_down, None))
161        for thread in self.threads:
162            thread.join()
163        return self.get_results()
164
165    def _init(self):
166        for index in range(self.number_of_threads):
167            self.threads.append(ServiceThread(self.worker_type))
168        for thread in self.threads:
169            self.thread_pending_results[thread] = 0
170            thread.start()
171
172    def _get_best_thread(self) -> ServiceThread:
173        thread_load: Dict = dict()
174        thread_index = -1
175        for thread in self.threads:
176            thread: ServiceThread = thread
177            thread_index += 1
178            thread_load[thread_index] = thread.requests.unfinished_tasks
179        sorted_by_value = sorted(thread_load.items(), key=lambda kv: kv[1])
180        return self.threads[sorted_by_value[0][0]]
181
182    def _get_threads_list(self) -> List[Tuple[int, ServiceThread]]:
183        thread_load: Dict = dict()
184        thread_index = -1
185        for thread in self.threads:
186            thread: ServiceThread = thread
187            thread_index += 1
188            thread_load[thread_index] = thread.requests.unfinished_tasks
189        sorted_by_value = sorted(thread_load.items(), key=lambda kv: kv[1])
190        return sorted_by_value
191
192    def _put_impl(self, request: RequestToThread) -> bool:
193        is_ok = False
194        for thread_index, thread_qsize in self._get_threads_list():
195            try:
196                thread = self.threads[thread_index]
197                thread.put_nowait(request)
198                self.thread_pending_results[thread] += 1
199                is_ok = True
200                break
201            except queue.Full:
202                pass
203        return is_ok
ServiceThreadPool( worker_type: typing.Type[typing.Callable[[RequestToThread], ResponseFromThread]], number_of_threads: int = 1)
121    def __init__(self, worker_type: TypeOfThreadWorker, number_of_threads: int = 1):
122        self.worker_type = worker_type
123        self.number_of_threads = number_of_threads
124        if self.number_of_threads < 1:
125            self.number_of_threads = 1
126        self.threads = list()                 # type: List[ServiceThread]
127        self.thread_pending_results = dict()  # type: Dict[ServiceThread, int]
128        self.pending_requests_queue = list()  # type: List[RequestToThread]
129        self._init()
worker_type
number_of_threads
threads
thread_pending_results
pending_requests_queue
def put_synchronous( self, request: RequestToThread):
131    def put_synchronous(self, request: RequestToThread):
132        thread: ServiceThread = self._get_best_thread()
133        thread.put(request)
134        self.thread_pending_results[thread] += 1
def put_into_pending_queue( self, request: RequestToThread):
136    def put_into_pending_queue(self, request: RequestToThread):
137        self.pending_requests_queue.append(request)
def put_pending_queue_into_work(self):
139    def put_pending_queue_into_work(self):
140        buff_pending_requests_queue = self.pending_requests_queue
141        self.pending_requests_queue = type(self.pending_requests_queue)()
142        for pending_request in buff_pending_requests_queue:
143            if not self._put_impl(pending_request):
144                self.pending_requests_queue.append(pending_request)
def get_results(self) -> List[Any]:
146    def get_results(self) -> List[Any]:
147        responses: List[Any] = list()
148        for thread, pending_responses in self.thread_pending_results.items():
149            if pending_responses:
150                try:
151                    while True:
152                        responses.append(thread.get_nowait())
153                        self.thread_pending_results[thread] -= 1
154                except queue.Empty:
155                    pass
156        return responses
def stop(self) -> List[Any]:
158    def stop(self) -> List[Any]:
159        for thread in self.threads:
160            thread.put(RequestToThread(RequestToThread.Command.shut_down, None))
161        for thread in self.threads:
162            thread.join()
163        return self.get_results()