cengal.parallel_execution.multiprocess.multiprocessing_task_pool
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18from .multiprocessing_task_runner import SubprocessWorker 19from cengal.data_generation.id_generator import IDGenerator 20 21""" 22Module Docstring 23Docstrings: http://www.python.org/dev/peps/pep-0257/ 24""" 25 26__author__ = "ButenkoMS <gtalk@butenkoms.space>" 27__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 28__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 29__license__ = "Apache License, Version 2.0" 30__version__ = "4.4.1" 31__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 32__email__ = "gtalk@butenkoms.space" 33# __status__ = "Prototype" 34__status__ = "Development" 35# __status__ = "Production" 36 37 38class SingleTaskInfo: 39 def __init__(self, process=None, taskId=None): 40 super().__init__() 41 self.process = process 42 self.isNeedToBeClosed = False 43 self.uncompletedTasksQnt = 0 # хочу что бы оно было тут, а не в классе процесса: хочу чтобы класс процесса был 44 # thread safe без локов 45 self.taskId = taskId 46 47 48class TaskPool: 49 def __init__(self, processesQnt, working_function, initiation_function=None): 50 super().__init__() 51 self._processesQnt = processesQnt 52 self._working_function = working_function 53 self._initiation_function = initiation_function 54 self._processesIdGenerator = IDGenerator() 55 self._taskList = set() 56 self._taskDict = dict() 57 for processNumber in range(self._processesQnt): 58 taskId = self._processesIdGenerator() 59 process = SubprocessWorker(self._working_function, self._initiation_function) 60 task = SingleTaskInfo(process, taskId) 61 self._taskList.add(taskId) 62 self._taskDict[taskId] = task 63 ... 64 65 def get_processes_qnt(self): 66 return self._processesQnt 67 68 def set_processes_qnt(self, processesQnt): 69 ... 70 71 def _send_data_to_subprocess(self, inputData, processId): 72 ... 73 74 def _get_answer_from_subprocess(self, processId): 75 ... 76 77 def send_data_to_the_pool(self, inputData): 78 ... 79 80 def get_answer_from_the_pool(self): 81 ... 82 83 def send_list_of_data_to_the_pool(self, listOfData): 84 ... 85 86 def get_list_of_answers_from_the_pool(self): 87 ... 88 89 def stop_pool(self): 90 ... 91 92 ... 93 94 95 96def rpc_blocking(): 97 pass
class
SingleTaskInfo:
39class SingleTaskInfo: 40 def __init__(self, process=None, taskId=None): 41 super().__init__() 42 self.process = process 43 self.isNeedToBeClosed = False 44 self.uncompletedTasksQnt = 0 # хочу что бы оно было тут, а не в классе процесса: хочу чтобы класс процесса был 45 # thread safe без локов 46 self.taskId = taskId
class
TaskPool:
49class TaskPool: 50 def __init__(self, processesQnt, working_function, initiation_function=None): 51 super().__init__() 52 self._processesQnt = processesQnt 53 self._working_function = working_function 54 self._initiation_function = initiation_function 55 self._processesIdGenerator = IDGenerator() 56 self._taskList = set() 57 self._taskDict = dict() 58 for processNumber in range(self._processesQnt): 59 taskId = self._processesIdGenerator() 60 process = SubprocessWorker(self._working_function, self._initiation_function) 61 task = SingleTaskInfo(process, taskId) 62 self._taskList.add(taskId) 63 self._taskDict[taskId] = task 64 ... 65 66 def get_processes_qnt(self): 67 return self._processesQnt 68 69 def set_processes_qnt(self, processesQnt): 70 ... 71 72 def _send_data_to_subprocess(self, inputData, processId): 73 ... 74 75 def _get_answer_from_subprocess(self, processId): 76 ... 77 78 def send_data_to_the_pool(self, inputData): 79 ... 80 81 def get_answer_from_the_pool(self): 82 ... 83 84 def send_list_of_data_to_the_pool(self, listOfData): 85 ... 86 87 def get_list_of_answers_from_the_pool(self): 88 ... 89 90 def stop_pool(self): 91 ... 92 93 ...
TaskPool(processesQnt, working_function, initiation_function=None)
50 def __init__(self, processesQnt, working_function, initiation_function=None): 51 super().__init__() 52 self._processesQnt = processesQnt 53 self._working_function = working_function 54 self._initiation_function = initiation_function 55 self._processesIdGenerator = IDGenerator() 56 self._taskList = set() 57 self._taskDict = dict() 58 for processNumber in range(self._processesQnt): 59 taskId = self._processesIdGenerator() 60 process = SubprocessWorker(self._working_function, self._initiation_function) 61 task = SingleTaskInfo(process, taskId) 62 self._taskList.add(taskId) 63 self._taskDict[taskId] = task 64 ...
def
rpc_blocking():