cengal.parallel_execution.multiprocess.multiprocessing_task_pool

 1#!/usr/bin/env python
 2# coding=utf-8
 3
 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
 5# 
 6# Licensed under the Apache License, Version 2.0 (the "License");
 7# you may not use this file except in compliance with the License.
 8# You may obtain a copy of the License at
 9# 
10#     http://www.apache.org/licenses/LICENSE-2.0
11# 
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18from .multiprocessing_task_runner import SubprocessWorker
19from cengal.data_generation.id_generator import IDGenerator
20
21"""
22Module Docstring
23Docstrings: http://www.python.org/dev/peps/pep-0257/
24"""
25
26__author__ = "ButenkoMS <gtalk@butenkoms.space>"
27__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
28__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
29__license__ = "Apache License, Version 2.0"
30__version__ = "4.4.1"
31__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
32__email__ = "gtalk@butenkoms.space"
33# __status__ = "Prototype"
34__status__ = "Development"
35# __status__ = "Production"
36
37
38class SingleTaskInfo:
39    def __init__(self, process=None, taskId=None):
40        super().__init__()
41        self.process = process
42        self.isNeedToBeClosed = False
43        self.uncompletedTasksQnt = 0  # хочу что бы оно было тут, а не в классе процесса: хочу чтобы класс процесса был
44            # thread safe без локов
45        self.taskId = taskId
46
47
48class TaskPool:
49    def __init__(self, processesQnt, working_function, initiation_function=None):
50        super().__init__()
51        self._processesQnt = processesQnt
52        self._working_function = working_function
53        self._initiation_function = initiation_function
54        self._processesIdGenerator = IDGenerator()
55        self._taskList = set()
56        self._taskDict = dict()
57        for processNumber in range(self._processesQnt):
58            taskId = self._processesIdGenerator()
59            process = SubprocessWorker(self._working_function, self._initiation_function)
60            task = SingleTaskInfo(process, taskId)
61            self._taskList.add(taskId)
62            self._taskDict[taskId] = task
63        ...
64
65    def get_processes_qnt(self):
66        return self._processesQnt
67
68    def set_processes_qnt(self, processesQnt):
69        ...
70
71    def _send_data_to_subprocess(self, inputData, processId):
72        ...
73
74    def _get_answer_from_subprocess(self, processId):
75        ...
76
77    def send_data_to_the_pool(self, inputData):
78        ...
79
80    def get_answer_from_the_pool(self):
81        ...
82
83    def send_list_of_data_to_the_pool(self, listOfData):
84        ...
85
86    def get_list_of_answers_from_the_pool(self):
87        ...
88
89    def stop_pool(self):
90        ...
91
92    ...
93
94
95
96def rpc_blocking():
97    pass
class SingleTaskInfo:
39class SingleTaskInfo:
40    def __init__(self, process=None, taskId=None):
41        super().__init__()
42        self.process = process
43        self.isNeedToBeClosed = False
44        self.uncompletedTasksQnt = 0  # хочу что бы оно было тут, а не в классе процесса: хочу чтобы класс процесса был
45            # thread safe без локов
46        self.taskId = taskId
SingleTaskInfo(process=None, taskId=None)
40    def __init__(self, process=None, taskId=None):
41        super().__init__()
42        self.process = process
43        self.isNeedToBeClosed = False
44        self.uncompletedTasksQnt = 0  # хочу что бы оно было тут, а не в классе процесса: хочу чтобы класс процесса был
45            # thread safe без локов
46        self.taskId = taskId
process
isNeedToBeClosed
uncompletedTasksQnt
taskId
class TaskPool:
49class TaskPool:
50    def __init__(self, processesQnt, working_function, initiation_function=None):
51        super().__init__()
52        self._processesQnt = processesQnt
53        self._working_function = working_function
54        self._initiation_function = initiation_function
55        self._processesIdGenerator = IDGenerator()
56        self._taskList = set()
57        self._taskDict = dict()
58        for processNumber in range(self._processesQnt):
59            taskId = self._processesIdGenerator()
60            process = SubprocessWorker(self._working_function, self._initiation_function)
61            task = SingleTaskInfo(process, taskId)
62            self._taskList.add(taskId)
63            self._taskDict[taskId] = task
64        ...
65
66    def get_processes_qnt(self):
67        return self._processesQnt
68
69    def set_processes_qnt(self, processesQnt):
70        ...
71
72    def _send_data_to_subprocess(self, inputData, processId):
73        ...
74
75    def _get_answer_from_subprocess(self, processId):
76        ...
77
78    def send_data_to_the_pool(self, inputData):
79        ...
80
81    def get_answer_from_the_pool(self):
82        ...
83
84    def send_list_of_data_to_the_pool(self, listOfData):
85        ...
86
87    def get_list_of_answers_from_the_pool(self):
88        ...
89
90    def stop_pool(self):
91        ...
92
93    ...
TaskPool(processesQnt, working_function, initiation_function=None)
50    def __init__(self, processesQnt, working_function, initiation_function=None):
51        super().__init__()
52        self._processesQnt = processesQnt
53        self._working_function = working_function
54        self._initiation_function = initiation_function
55        self._processesIdGenerator = IDGenerator()
56        self._taskList = set()
57        self._taskDict = dict()
58        for processNumber in range(self._processesQnt):
59            taskId = self._processesIdGenerator()
60            process = SubprocessWorker(self._working_function, self._initiation_function)
61            task = SingleTaskInfo(process, taskId)
62            self._taskList.add(taskId)
63            self._taskDict[taskId] = task
64        ...
def get_processes_qnt(self):
66    def get_processes_qnt(self):
67        return self._processesQnt
def set_processes_qnt(self, processesQnt):
69    def set_processes_qnt(self, processesQnt):
70        ...
def send_data_to_the_pool(self, inputData):
78    def send_data_to_the_pool(self, inputData):
79        ...
def get_answer_from_the_pool(self):
81    def get_answer_from_the_pool(self):
82        ...
def send_list_of_data_to_the_pool(self, listOfData):
84    def send_list_of_data_to_the_pool(self, listOfData):
85        ...
def get_list_of_answers_from_the_pool(self):
87    def get_list_of_answers_from_the_pool(self):
88        ...
def stop_pool(self):
90    def stop_pool(self):
91        ...
def rpc_blocking():
97def rpc_blocking():
98    pass