cengal.parallel_execution.multiprocess.multiprocessing_task_runner

  1#!/usr/bin/env python
  2# coding=utf-8
  3
  4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>
  5# 
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9# 
 10#     http://www.apache.org/licenses/LICENSE-2.0
 11# 
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18
 19__all__ = ['SubprocessIsNotInitiatedError', 'SubprocessIsNotReadyError', 'SubprocessTerminatedError', 'Empty', 'Full', 'SendableDataType', 'Transport', 'SubprocessWorkerSettings', 'SubprocessWorker', '_subprocess_wrapper_profile', 'ExternalPipe']
 20
 21
 22import cProfile
 23# import multiprocessing
 24from multiprocessing import Process, Queue, Pipe
 25from threading import Thread
 26import sys
 27import traceback
 28from queue import Empty, Full
 29import marshal
 30import pickle
 31import os
 32from cengal.data_manipulation.front_triggerable_variable import FrontTriggerableVariable, FrontTriggerableVariableType
 33from cengal.base.classes import BaseClassSettings
 34from cengal.time_management.cpu_clock_cycles import perf_counter
 35from cengal.introspection.inspect import pdi
 36from typing import Callable, Any, Union, Optional, Tuple, List, Dict
 37
 38# import time
 39
 40"""
 41Module Docstring
 42Docstrings: http://www.python.org/dev/peps/pep-0257/
 43"""
 44
 45__author__ = "ButenkoMS <gtalk@butenkoms.space>"
 46__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>"
 47__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ]
 48__license__ = "Apache License, Version 2.0"
 49__version__ = "4.4.1"
 50__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>"
 51__email__ = "gtalk@butenkoms.space"
 52# __status__ = "Prototype"
 53__status__ = "Development"
 54# __status__ = "Production"
 55
 56
 57class SubprocessIsNotInitiatedError(Exception):
 58    pass
 59
 60
 61class SubprocessIsNotReadyError(Exception):
 62    pass
 63
 64
 65class SubprocessTerminatedError(Exception):
 66    pass
 67
 68
 69class SendableDataType:
 70    pickable = 0
 71    marshalable = 1
 72    custom = 2
 73
 74
 75class Transport:
 76    queue = 0
 77    pipe = 1
 78    tcp = 2
 79
 80
 81class SubprocessWorkerSettings(BaseClassSettings):
 82    def __init__(self):
 83        self.initiation_function = None  # self.data_shelf = self.settings.initiation_function(
 84        #   self.initialization_data)
 85        self.working_function = None  # 1) 'answer = self.settings.working_function(input_data)'
 86        #       if initiation_function is None;
 87        #   2) 'answer = self.settings.working_function(self.data_shelf, input_data)' if initiation_function
 88        #       is not None.
 89        self.stopping_function = None
 90
 91        self.on_input_queue_is_too_big = None  # self.on_input_queue_is_too_big(self.data_shelf,
 92        #   average_input_size_trigger_result) where: average_input_size_trigger_result =
 93        #   self.input_queue_average_size_trigger.test_trigger(average_input_size)
 94        self.on_another_bunch_of_data_was_processed = None  # self.on_another_bunch_of_data_was_processed(
 95        #   self.data_shelf)
 96        self.on_exit = None  # on process exit; self.on_exit(self.data_shelf)
 97
 98        self.need_multithreading = False  # will use multithread mode if True; and multiprocess else.
 99        self.process_name: str = None  # str()
100        self.profile = False  # will start worker in profiling mode
101        self.initialization_data = None  # any pickable Python data
102        self.transport: Transport = None  # Transport()
103        self.tcp_settings = None  # tcp_link.TCPSettings(). Is used when (self.transport == Transport.tcp)
104        self.use_internal_subprocess_input_buffer = False  # will be able to get input data in nonblocking mode
105        self.sendable_data_type: SendableDataType = SendableDataType.pickable
106        self.sendable_data__encoder = None  # Will be in use when sendable_data_type == SendableDataType.custom.
107        #   Function will encode data in to bytes()
108        self.sendable_data__decoder = None  # Will be in use when sendable_data_type == SendableDataType.custom.
109        #   Function will decode data from bytes()
110        self.queue_to_subprocess = None  # only needed if you want to directly connect this worker with another.
111        #   For example to connect output from this worker to input of another worker.
112        self.queue_from_subprocess = None  # only needed if you want to directly connect this worker with another.
113        #   For example to connect output from this worker to input of another worker.
114        self.input_queue_average_size_trigger_limit = 30
115        self.subprocess_polling_timeout = 0.0  # None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds
116        self.subprocess_reading_timeout = 0.1  # None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds
117        self.subprocess_writing_timeout = 0.1  # None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds
118        self.subprocess_invalidation_timeout = 0.1  # None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds
119        self.indicate_subprocess_readyness: bool = True
120
121    def check(self):
122        if self.working_function is None:
123            raise Exception('working_function cannot be None!')
124
125        if self.transport is None:
126            raise Exception('transport can\'t be None')
127        else:
128            if Transport.tcp == self.transport:
129                if self.tcp_settings is None:
130                    raise Exception('tcp_settings can\'t be None while (Transport.tcp == self.transport)')
131                else:
132                    self.tcp_settings.check()
133
134
135class SubprocessWorker:
136    def __init__(self, settings: SubprocessWorkerSettings):
137        """ 
138        :param settings: SubprocessWorkerSettings(); you should use copy.copy(SubprocessWorkerSettings(...)) by your 
139            self if you want
140        :return: 
141        """
142        super().__init__()
143        
144        self.settings: SubprocessWorkerSettings = settings
145        self.settings.check()
146
147        self.data_shelf = None
148
149        self.subprocess_was_initiated = False
150        self.subprocess = None
151        self.queue_to_subprocess = self.settings.queue_to_subprocess
152        self.queue_from_subprocess = self.settings.queue_from_subprocess
153
154        self.list_of_subprocess_input_data: List = list()
155        self.input_size_print_sum = 0
156        self.input_size_print_counter = 0
157        self.input_size_print_counter_limit = 500
158        # self.last_log_print_time = time.time()
159        self.subprocess_readyness_indicated: bool = False
160        self.input_queue_average_size_trigger = FrontTriggerableVariable(
161            FrontTriggerableVariableType.bigger_or_equal, self.settings.input_queue_average_size_trigger_limit)
162
163    def _encode_sendable_data(self, data):
164        result = data
165        if Transport.queue == self.settings.transport:
166            if self.settings.sendable_data_type == SendableDataType.pickable:
167                pass
168            elif self.settings.sendable_data_type == SendableDataType.marshalable:
169                result = marshal.dumps(result)
170            elif self.settings.sendable_data_type == SendableDataType.custom:
171                result = self.settings.sendable_data__encoder(result)
172        else:
173            if self.settings.sendable_data_type == SendableDataType.pickable:
174                result = pickle.dumps(result)
175            elif self.settings.sendable_data_type == SendableDataType.marshalable:
176                result = marshal.dumps(result)
177            elif self.settings.sendable_data_type == SendableDataType.custom:
178                result = self.settings.sendable_data__encoder(result)
179        return result
180
181    def _decode_sendable_data(self, data):
182        result = data
183        if Transport.queue == self.settings.transport:
184            if self.settings.sendable_data_type == SendableDataType.pickable:
185                pass
186            elif self.settings.sendable_data_type == SendableDataType.marshalable:
187                result = marshal.loads(result)
188            elif self.settings.sendable_data_type == SendableDataType.custom:
189                result = self.settings.sendable_data__decoder(result)
190        else:
191            if self.settings.sendable_data_type == SendableDataType.pickable:
192                result = pickle.loads(result)
193            elif self.settings.sendable_data_type == SendableDataType.marshalable:
194                result = marshal.loads(result)
195            elif self.settings.sendable_data_type == SendableDataType.custom:
196                result = self.settings.sendable_data__decoder(result)
197        return result
198
199    def _parsing_worker_wrapper(self, input_data, stop=False):
200        # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '0')
201        exception = None
202        answer = None
203        result = None
204        try:
205            # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '1')
206            if self.settings.initiation_function is None:
207                # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '2')
208                if stop:
209                    if self.settings.stopping_function is not None:
210                        answer = self.settings.stopping_function()
211                    else:
212                        answer = 'Stopped'
213                else:
214                    answer = self.settings.working_function(input_data)
215                # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '3')
216            else:
217                # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '4')
218                if stop:
219                    if self.settings.stopping_function is not None:
220                        answer = self.settings.stopping_function(self.data_shelf)
221                    else:
222                        answer = 'Stopped'
223                else:
224                    answer = self.settings.working_function(self.data_shelf, input_data)
225                # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '5')
226        except:
227            # # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '6')
228            # exception = sys.exc_info()
229            # formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2])
230            # exception = exception[:2] + (formatted_traceback,)
231            # answer = (input_data[0], None)
232            # # print(self.settings.process_name)
233            # # print(input_data)
234            # # print(exception)
235            answer = None
236            exception = pickle.dumps(sys.exc_info())
237
238        # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '7')
239        if (answer is not None) or (exception is not None):
240            # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '8')
241            result = (answer, exception)
242
243        # print('<<===', self.settings.process_name, '_parsing_worker_wrapper')
244        return result
245
246    def _subprocess_wrapper(self):
247        try:
248            # input_from_parent_process_queue = self.queue_to_subprocess
249            # output_to_parent_process_queue = self.queue_from_subprocess
250
251            # print(' STARTED:', self.settings.process_name, '; PID:', os.getpid())
252
253            input_from_parent_process_queue = None
254            output_to_parent_process_queue = None
255            if Transport.pipe == self.settings.transport:
256                input_from_parent_process_queue = self.queue_to_subprocess[1]
257                output_to_parent_process_queue = self.queue_from_subprocess[1]
258            elif Transport.queue == self.settings.transport:
259                input_from_parent_process_queue = self.queue_to_subprocess
260                output_to_parent_process_queue = self.queue_from_subprocess
261
262            if self.settings.indicate_subprocess_readyness:
263                result = (True, ('Started', None))
264                result = self._encode_sendable_data(result)
265                if Transport.pipe == self.settings.transport:
266                    output_to_parent_process_queue.send_bytes(result)
267                elif Transport.queue == self.settings.transport:
268                    output_to_parent_process_queue.put(result)
269
270            if self.settings.initiation_function is not None:
271                self.data_shelf = self.settings.initiation_function(self.settings.initialization_data)
272            while True:
273                # print('===>>', self.settings.process_name, '_subprocess_wrapper', '0')
274                # print('===>>', self.settings.process_name, '_subprocess_wrapper', '1', 'qsize:',
275                #       input_from_parent_process_queue.qsize())
276
277                # input_size = input_from_parent_process_queue.qsize()
278                # if input_size > 3:
279                #     print('===>>', self.settings.process_name, 'input_size:', input_size)
280                # output_size = output_to_parent_process_queue.qsize()
281                # if output_size > 3:
282                #     print('===>>', self.settings.process_name, 'output_size:', output_size)
283
284                data = None
285                if self.settings.use_internal_subprocess_input_buffer:
286                    while True:
287                        another_chunk_of_data = None
288                        if Transport.pipe == self.settings.transport:
289                            if input_from_parent_process_queue.poll():
290                                another_chunk_of_data = input_from_parent_process_queue.recv_bytes()
291                            else:
292                                break
293                        elif Transport.queue == self.settings.transport:
294                            if not input_from_parent_process_queue.empty():
295                                another_chunk_of_data = input_from_parent_process_queue.get()
296                            else:
297                                break
298                        
299                        if another_chunk_of_data:
300                            self.list_of_subprocess_input_data.append(another_chunk_of_data)
301
302                    input_size = len(self.list_of_subprocess_input_data)
303                    self.input_size_print_counter += 1
304                    self.input_size_print_sum += input_size
305                    if self.input_size_print_counter >= self.input_size_print_counter_limit:
306                        average_input_size = self.input_size_print_sum / self.input_size_print_counter
307                        self.input_size_print_sum = 0
308                        self.input_size_print_counter = 0
309                        average_input_size_trigger_result = self.input_queue_average_size_trigger.test_trigger(
310                            average_input_size)
311                        if average_input_size_trigger_result is not None:
312                            # if average_input_size_trigger_result:
313                            #     print('===>>', self.settings.process_name, 'average_input_size:', average_input_size)
314                            # else:
315                            #     print('===>>', self.settings.process_name, 'average_input_size is OK:', average_input_size)
316
317                            if self.settings.on_input_queue_is_too_big is not None:
318                                self.settings.on_input_queue_is_too_big(self.data_shelf, average_input_size_trigger_result)
319
320                    # data = input_from_parent_process_queue.get(block=False, timeout=self.settings.subprocess_reading_timeout)
321                    if len(self.list_of_subprocess_input_data) > 0:
322                        data = self.list_of_subprocess_input_data[0]
323                        # print('===>>', self.settings.process_name, 'input_data:', data)
324                        self.list_of_subprocess_input_data = self.list_of_subprocess_input_data[1:]
325                    else:
326                        continue
327                else:
328                    if Transport.pipe == self.settings.transport:
329                        data = input_from_parent_process_queue.recv_bytes()
330                    elif Transport.queue == self.settings.transport:
331                        input_size = input_from_parent_process_queue.qsize()
332                        self.input_size_print_counter += 1
333                        self.input_size_print_sum += input_size
334                        if self.input_size_print_counter >= self.input_size_print_counter_limit:
335                            average_input_size = self.input_size_print_sum / self.input_size_print_counter
336                            self.input_size_print_sum = 0
337                            self.input_size_print_counter = 0
338                            average_input_size_trigger_result = self.input_queue_average_size_trigger.test_trigger(
339                                average_input_size)
340                            if average_input_size_trigger_result is not None:
341                                # if average_input_size_trigger_result:
342                                #     print('===>>', self.settings.process_name, 'average_input_size for Queue:',
343                                #         average_input_size)
344                                # else:
345                                #     print('===>>', self.settings.process_name, 'average_input_size for Queue is OK:',
346                                #         average_input_size)
347                                
348                                if self.settings.on_input_queue_is_too_big is not None:
349                                    self.settings.on_input_queue_is_too_big(self.data_shelf,
350                                                                            average_input_size_trigger_result)
351
352                        try:
353                            data = input_from_parent_process_queue.get(block=True)
354                        except Empty:
355                            pass
356
357                # print('===>>', self.settings.process_name, '_subprocess_wrapper', '2')
358
359                # current_time = time.time()
360                # if (current_time - self.last_log_print_time) > 2:
361                #     print('===>>', self.settings.process_name, '_subprocess_wrapper')
362
363                if data is None:
364                    continue
365
366                is_result_was_send = False
367                is_worker_is_finalized = False
368                is_need_to_break_loop = False
369
370                data = self._decode_sendable_data(data)
371                # data = marshal.loads(data)
372                continue_processing: bool = data[0]
373                if continue_processing:
374                    # print('===>>', self.settings.process_name, '_subprocess_wrapper', '3')
375                    data_with_exception = data[1]
376                    data_only = data_with_exception[0]
377                    result = self._parsing_worker_wrapper(data_only)
378                    if result is not None:
379                        # print('===>>', self.settings.process_name, 'output_result:', result)
380                        # print('===>>', self.settings.process_name, '_subprocess_wrapper', '5')
381                        result = (True, result)
382                        result = self._encode_sendable_data(result)
383                        # result = marshal.dumps(result)
384                        # output_to_parent_process_queue.put(result)
385                        if Transport.pipe == self.settings.transport:
386                            output_to_parent_process_queue.send_bytes(result)
387                        elif Transport.queue == self.settings.transport:
388                            output_to_parent_process_queue.put(result)
389
390                        is_result_was_send = True
391                    # print('===>>', self.settings.process_name, '_subprocess_wrapper', '5')
392                else:
393                    # print('===>>', self.settings.process_name, '_subprocess_wrapper', '6')
394
395                    data_only = (None, None)
396                    self._parsing_worker_wrapper(data_only, stop=True)
397                    if result is not None:
398                        # print('===>>', self.settings.process_name, 'output_result:', result)
399                        # print('===>>', self.settings.process_name, '_subprocess_wrapper', '5')
400                        result = (False, result)
401                        result = self._encode_sendable_data(result)
402                        # result = marshal.dumps(result)
403                        # output_to_parent_process_queue.put(result)
404                        if Transport.pipe == self.settings.transport:
405                            output_to_parent_process_queue.send_bytes(result)
406                        elif Transport.queue == self.settings.transport:
407                            output_to_parent_process_queue.put(result)
408
409                        is_result_was_send = True
410
411                    is_worker_is_finalized = True
412                    is_need_to_break_loop = True
413                    # break
414
415                if continue_processing and (self.settings.on_another_bunch_of_data_was_processed is not None) and is_result_was_send:
416                    self.settings.on_another_bunch_of_data_was_processed(self.data_shelf)
417
418                if (self.settings.on_exit is not None) and is_worker_is_finalized:
419                    self.settings.on_exit(self.data_shelf)
420
421                if is_need_to_break_loop:
422                    break
423
424                # print('<<===', self.settings.process_name, '_subprocess_wrapper')
425
426                # if (current_time - self.last_log_print_time) > 2:
427                #     print('<<===', self.settings.process_name, '_subprocess_wrapper')
428                #     self.last_log_print_time = current_time
429
430            # print(' ENDED:', self.settings.process_name, '; PID:', os.getpid())
431            # print('<<===>>', self.settings.process_name, '_subprocess_wrapper')
432        except BrokenPipeError:
433            pass
434        except OSError:
435            pass
436        except ValueError:
437            pass
438        finally:
439            if Transport.pipe == self.settings.transport:
440                input_from_parent_process_queue = self.queue_to_subprocess[1]
441                output_to_parent_process_queue = self.queue_from_subprocess[1]
442            elif Transport.queue == self.settings.transport:
443                input_from_parent_process_queue = self.queue_to_subprocess
444                output_to_parent_process_queue = self.queue_from_subprocess
445            
446            input_from_parent_process_queue.close()
447            output_to_parent_process_queue.close()
448            
449
450    def start(self, wait_for_process_readyness: bool = True):
451        if not self.subprocess_was_initiated:
452            if self.queue_to_subprocess is None:
453                if Transport.pipe == self.settings.transport:
454                    self.queue_to_subprocess = Pipe()
455                elif Transport.queue == self.settings.transport:
456                    self.queue_to_subprocess = Queue()
457
458            if self.queue_from_subprocess is None:
459                if Transport.pipe == self.settings.transport:
460                    self.queue_from_subprocess = Pipe()
461                elif Transport.queue == self.settings.transport:
462                    self.queue_from_subprocess = Queue()
463
464            target = None
465            arguments = None
466            if self.settings.profile:
467                target = _subprocess_wrapper_profile
468                arguments = (self,)
469            else:
470                target = self._subprocess_wrapper
471                arguments = tuple()
472            self.subprocess = None
473            if self.settings.need_multithreading:
474                self.subprocess = Thread(target=target, args=arguments, daemon=True)
475            else:
476                self.subprocess = Process(target=target, args=arguments, daemon=True)
477            
478            self.subprocess.start()
479            self.subprocess_was_initiated = True
480        
481        if wait_for_process_readyness:
482            self.wait_for_subprocess_readines(block=True)
483    
484    def wait_for_subprocess_readines(self, block: bool = True):
485        if not self.subprocess_was_initiated:
486            raise SubprocessIsNotInitiatedError
487        
488        if self.subprocess_readyness_indicated:
489            return
490
491        if self.settings.indicate_subprocess_readyness:
492            self.subprocess_readyness_indicated = True
493            try:
494                try:
495                    self.get_answer_from_subprocess(block=block)
496                    self.subprocess_readyness_indicated = True
497                except Empty:
498                    raise SubprocessIsNotReadyError
499            except:
500                self.subprocess_readyness_indicated = False
501                raise
502    
503    def _close_connections(self):
504        if self.subprocess_was_initiated:
505            if Transport.pipe == self.settings.transport:
506                self.queue_to_subprocess[0].close()
507                self.queue_from_subprocess[0].close()
508            elif Transport.queue == self.settings.transport:
509                self.queue_to_subprocess.close()
510                self.queue_from_subprocess.close()
511
512    def stop(self):
513        if not self.subprocess_was_initiated:
514            return
515        
516        self.wait_for_subprocess_readines(block=True)
517
518        data = (False, (None, None))
519        data = self._encode_sendable_data(data)
520        # data = marshal.dumps(data)
521
522        output_to_subprocess_queue = None
523        if Transport.pipe == self.settings.transport:
524            output_to_subprocess_queue = self.queue_to_subprocess[0]
525        elif Transport.queue == self.settings.transport:
526            output_to_subprocess_queue = self.queue_to_subprocess
527
528        if Transport.pipe == self.settings.transport:
529            output_to_subprocess_queue.send_bytes(data)
530        elif Transport.queue == self.settings.transport:
531            output_to_subprocess_queue.put(data, timeout=self.settings.subprocess_writing_timeout)
532        
533        try:
534            self.get_answer_from_subprocess(block=True)
535        except Empty:
536            pass
537        except SubprocessTerminatedError:
538            pass
539        finally:
540            self._close_connections()
541            self.subprocess_was_initiated = False
542            self.subprocess.join()
543    
544    def _invalidate(self):
545        self._close_connections()
546        self.subprocess_was_initiated = False
547        self.subprocess.join(self.settings.subprocess_invalidation_timeout)
548
549    def send_data_to_subprocess(self, input_data, block: bool = True):
550        """
551        If (Transport.pipe == self.settings.transport): Very large buffers (approximately 32 MB+, though it depends on
552            the OS) may raise a ValueError exception
553        :param input_data:
554        :return:
555        """
556        if not self.subprocess_was_initiated:
557            raise SubprocessIsNotInitiatedError
558
559        self.wait_for_subprocess_readines(block=block)
560        
561        data = (True, (input_data, None))
562        data = self._encode_sendable_data(data)
563        # data = marshal.dumps(data)
564        need_to_retry = True
565        while need_to_retry:
566            subprocess_disconnected_or_terminated: bool = False
567            try:
568                output_to_subprocess_queue = None
569                if Transport.pipe == self.settings.transport:
570                    output_to_subprocess_queue = self.queue_to_subprocess[0]
571                elif Transport.queue == self.settings.transport:
572                    output_to_subprocess_queue = self.queue_to_subprocess
573
574                if Transport.pipe == self.settings.transport:
575                    output_to_subprocess_queue.send_bytes(data)
576                elif Transport.queue == self.settings.transport:
577                    output_to_subprocess_queue.put(data, timeout=self.settings.subprocess_writing_timeout)
578                
579                need_to_retry = False
580            except OSError:
581                subprocess_disconnected_or_terminated = True
582            except ValueError:
583                subprocess_disconnected_or_terminated = True
584            except Full:
585                need_to_retry = block
586            
587            if subprocess_disconnected_or_terminated:
588                self._invalidate()
589                raise SubprocessTerminatedError
590
591    def is_input_queue_is_empty(self):
592        if not self.subprocess_was_initiated:
593            raise SubprocessIsNotInitiatedError
594
595        self.wait_for_subprocess_readines(block=False)
596        
597        result = None
598        subprocess_disconnected_or_terminated: bool = False
599        try:
600            output_to_subprocess_queue = None
601            if Transport.pipe == self.settings.transport:
602                output_to_subprocess_queue = self.queue_to_subprocess[0]
603            elif Transport.queue == self.settings.transport:
604                output_to_subprocess_queue = self.queue_to_subprocess
605
606            if Transport.pipe == self.settings.transport:
607                result = not output_to_subprocess_queue.poll(timeout=0.0)
608            elif Transport.queue == self.settings.transport:
609                result = output_to_subprocess_queue.empty()
610
611            # result = self.queue_to_subprocess.empty()
612        except OSError:
613            subprocess_disconnected_or_terminated = True
614        except ValueError:
615            subprocess_disconnected_or_terminated = True
616        
617        if subprocess_disconnected_or_terminated:
618            self._invalidate()
619            raise SubprocessTerminatedError
620
621        return result
622
623    def wait_for_data(self, timeout: Optional[Union[float, int]] = None):
624        start_time = perf_counter()
625        while not self.is_input_queue_is_empty():
626            if timeout is not None:
627                if (perf_counter() - start_time) >= timeout:
628                    break
629
630    def get_answer_from_subprocess(self, block=True):
631        """
632        If (Transport.pipe == self.settings.transport): Very large buffers (approximately 32 MB+, though it depends on
633            the OS) may raise a ValueError exception
634        Will raise Empty() in non-blocking mode when queue is empty
635        :param block:
636        :param time_out:  None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds
637        :return:
638        """
639        if not self.subprocess_was_initiated:
640            raise SubprocessIsNotInitiatedError
641
642        self.wait_for_subprocess_readines(block=block)
643        
644        subprocess_continue_working = None
645        answer = None
646        subprocess_disconnected_or_terminated: bool = False
647        try:
648            if Transport.pipe == self.settings.transport:
649                input_from_subprocess_queue = self.queue_from_subprocess[0]
650                if block:
651                    subprocess_answer = input_from_subprocess_queue.recv_bytes()
652                    subprocess_answer = self._decode_sendable_data(subprocess_answer)
653                    # subprocess_answer = marshal.loads(subprocess_answer)
654                    subprocess_continue_working, answer = subprocess_answer
655                else:
656                    if input_from_subprocess_queue.poll(timeout=0.0):
657                        subprocess_answer = input_from_subprocess_queue.recv_bytes()
658                        subprocess_answer = self._decode_sendable_data(subprocess_answer)
659                        # subprocess_answer = marshal.loads(subprocess_answer)
660                        subprocess_continue_working, answer = subprocess_answer
661                    else:
662                        raise Empty()
663            elif Transport.queue == self.settings.transport:
664                input_from_subprocess_queue = self.queue_from_subprocess
665                subprocess_answer = input_from_subprocess_queue.get(block=block, timeout=self.settings.subprocess_reading_timeout)
666                subprocess_answer = self._decode_sendable_data(subprocess_answer)
667                # subprocess_answer = marshal.loads(subprocess_answer)
668                subprocess_continue_working, answer = subprocess_answer
669            
670            if not subprocess_continue_working:
671                subprocess_disconnected_or_terminated = True
672        except OSError:
673            subprocess_disconnected_or_terminated = True
674        except ValueError:
675            subprocess_disconnected_or_terminated = True
676        
677        if subprocess_disconnected_or_terminated:
678            self._invalidate()
679            raise SubprocessTerminatedError
680        
681        exception = answer[1]
682        result = answer[0]
683        if exception is not None:
684            exception = pickle.loads(exception)
685            # print(self.settings.process_name)
686            # print(result)
687            # print(exception)
688            # print()
689            # print(' <<< SUBPROCESS EXCEPTION:')
690            # trace = ''
691            # for line in exception[2]:
692            #     trace += line
693            # print(trace, file=sys.stderr)
694            # print(exception[0])
695            # print(exception[1])
696            # print(' >>>')
697
698            exc_type, exc_value, exc_tb = exception
699            raise exc_value.with_traceback(exc_tb)
700        
701        return result
702
703
704def _subprocess_wrapper_profile(process_data):
705    printable_name = process_data.settings.process_name.replace(' ', '_') + '.prof'
706    cProfile.runctx('process_data._subprocess_wrapper()', globals(), locals(), printable_name)
707
708
709class ExternalPipe:
710    def __init__(self):
711        self.pipe = Pipe()
712        self.inverted_pipe = (self.pipe[1], self.pipe[0])
class SubprocessIsNotInitiatedError(builtins.Exception):
58class SubprocessIsNotInitiatedError(Exception):
59    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class SubprocessIsNotReadyError(builtins.Exception):
62class SubprocessIsNotReadyError(Exception):
63    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class SubprocessTerminatedError(builtins.Exception):
66class SubprocessTerminatedError(Exception):
67    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class Empty(builtins.Exception):

Exception raised by Queue.get(block=0)/get_nowait().

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class Full(builtins.Exception):
23class Full(Exception):
24    'Exception raised by Queue.put(block=0)/put_nowait().'
25    pass

Exception raised by Queue.put(block=0)/put_nowait().

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class SendableDataType:
70class SendableDataType:
71    pickable = 0
72    marshalable = 1
73    custom = 2
pickable = 0
marshalable = 1
custom = 2
class Transport:
76class Transport:
77    queue = 0
78    pipe = 1
79    tcp = 2
queue = 0
pipe = 1
tcp = 2
class SubprocessWorkerSettings(cengal.base.classes.versions.v_0.classes.BaseClassSettings):
 82class SubprocessWorkerSettings(BaseClassSettings):
 83    def __init__(self):
 84        self.initiation_function = None  # self.data_shelf = self.settings.initiation_function(
 85        #   self.initialization_data)
 86        self.working_function = None  # 1) 'answer = self.settings.working_function(input_data)'
 87        #       if initiation_function is None;
 88        #   2) 'answer = self.settings.working_function(self.data_shelf, input_data)' if initiation_function
 89        #       is not None.
 90        self.stopping_function = None
 91
 92        self.on_input_queue_is_too_big = None  # self.on_input_queue_is_too_big(self.data_shelf,
 93        #   average_input_size_trigger_result) where: average_input_size_trigger_result =
 94        #   self.input_queue_average_size_trigger.test_trigger(average_input_size)
 95        self.on_another_bunch_of_data_was_processed = None  # self.on_another_bunch_of_data_was_processed(
 96        #   self.data_shelf)
 97        self.on_exit = None  # on process exit; self.on_exit(self.data_shelf)
 98
 99        self.need_multithreading = False  # will use multithread mode if True; and multiprocess else.
100        self.process_name: str = None  # str()
101        self.profile = False  # will start worker in profiling mode
102        self.initialization_data = None  # any pickable Python data
103        self.transport: Transport = None  # Transport()
104        self.tcp_settings = None  # tcp_link.TCPSettings(). Is used when (self.transport == Transport.tcp)
105        self.use_internal_subprocess_input_buffer = False  # will be able to get input data in nonblocking mode
106        self.sendable_data_type: SendableDataType = SendableDataType.pickable
107        self.sendable_data__encoder = None  # Will be in use when sendable_data_type == SendableDataType.custom.
108        #   Function will encode data in to bytes()
109        self.sendable_data__decoder = None  # Will be in use when sendable_data_type == SendableDataType.custom.
110        #   Function will decode data from bytes()
111        self.queue_to_subprocess = None  # only needed if you want to directly connect this worker with another.
112        #   For example to connect output from this worker to input of another worker.
113        self.queue_from_subprocess = None  # only needed if you want to directly connect this worker with another.
114        #   For example to connect output from this worker to input of another worker.
115        self.input_queue_average_size_trigger_limit = 30
116        self.subprocess_polling_timeout = 0.0  # None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds
117        self.subprocess_reading_timeout = 0.1  # None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds
118        self.subprocess_writing_timeout = 0.1  # None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds
119        self.subprocess_invalidation_timeout = 0.1  # None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds
120        self.indicate_subprocess_readyness: bool = True
121
122    def check(self):
123        if self.working_function is None:
124            raise Exception('working_function cannot be None!')
125
126        if self.transport is None:
127            raise Exception('transport can\'t be None')
128        else:
129            if Transport.tcp == self.transport:
130                if self.tcp_settings is None:
131                    raise Exception('tcp_settings can\'t be None while (Transport.tcp == self.transport)')
132                else:
133                    self.tcp_settings.check()
initiation_function
working_function
stopping_function
on_input_queue_is_too_big
on_another_bunch_of_data_was_processed
on_exit
need_multithreading
process_name: str
profile
initialization_data
transport: Transport
tcp_settings
use_internal_subprocess_input_buffer
sendable_data_type: SendableDataType
sendable_data__encoder
sendable_data__decoder
queue_to_subprocess
queue_from_subprocess
input_queue_average_size_trigger_limit
subprocess_polling_timeout
subprocess_reading_timeout
subprocess_writing_timeout
subprocess_invalidation_timeout
indicate_subprocess_readyness: bool
def check(self):
122    def check(self):
123        if self.working_function is None:
124            raise Exception('working_function cannot be None!')
125
126        if self.transport is None:
127            raise Exception('transport can\'t be None')
128        else:
129            if Transport.tcp == self.transport:
130                if self.tcp_settings is None:
131                    raise Exception('tcp_settings can\'t be None while (Transport.tcp == self.transport)')
132                else:
133                    self.tcp_settings.check()
class SubprocessWorker:
136class SubprocessWorker:
137    def __init__(self, settings: SubprocessWorkerSettings):
138        """ 
139        :param settings: SubprocessWorkerSettings(); you should use copy.copy(SubprocessWorkerSettings(...)) by your 
140            self if you want
141        :return: 
142        """
143        super().__init__()
144        
145        self.settings: SubprocessWorkerSettings = settings
146        self.settings.check()
147
148        self.data_shelf = None
149
150        self.subprocess_was_initiated = False
151        self.subprocess = None
152        self.queue_to_subprocess = self.settings.queue_to_subprocess
153        self.queue_from_subprocess = self.settings.queue_from_subprocess
154
155        self.list_of_subprocess_input_data: List = list()
156        self.input_size_print_sum = 0
157        self.input_size_print_counter = 0
158        self.input_size_print_counter_limit = 500
159        # self.last_log_print_time = time.time()
160        self.subprocess_readyness_indicated: bool = False
161        self.input_queue_average_size_trigger = FrontTriggerableVariable(
162            FrontTriggerableVariableType.bigger_or_equal, self.settings.input_queue_average_size_trigger_limit)
163
164    def _encode_sendable_data(self, data):
165        result = data
166        if Transport.queue == self.settings.transport:
167            if self.settings.sendable_data_type == SendableDataType.pickable:
168                pass
169            elif self.settings.sendable_data_type == SendableDataType.marshalable:
170                result = marshal.dumps(result)
171            elif self.settings.sendable_data_type == SendableDataType.custom:
172                result = self.settings.sendable_data__encoder(result)
173        else:
174            if self.settings.sendable_data_type == SendableDataType.pickable:
175                result = pickle.dumps(result)
176            elif self.settings.sendable_data_type == SendableDataType.marshalable:
177                result = marshal.dumps(result)
178            elif self.settings.sendable_data_type == SendableDataType.custom:
179                result = self.settings.sendable_data__encoder(result)
180        return result
181
182    def _decode_sendable_data(self, data):
183        result = data
184        if Transport.queue == self.settings.transport:
185            if self.settings.sendable_data_type == SendableDataType.pickable:
186                pass
187            elif self.settings.sendable_data_type == SendableDataType.marshalable:
188                result = marshal.loads(result)
189            elif self.settings.sendable_data_type == SendableDataType.custom:
190                result = self.settings.sendable_data__decoder(result)
191        else:
192            if self.settings.sendable_data_type == SendableDataType.pickable:
193                result = pickle.loads(result)
194            elif self.settings.sendable_data_type == SendableDataType.marshalable:
195                result = marshal.loads(result)
196            elif self.settings.sendable_data_type == SendableDataType.custom:
197                result = self.settings.sendable_data__decoder(result)
198        return result
199
200    def _parsing_worker_wrapper(self, input_data, stop=False):
201        # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '0')
202        exception = None
203        answer = None
204        result = None
205        try:
206            # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '1')
207            if self.settings.initiation_function is None:
208                # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '2')
209                if stop:
210                    if self.settings.stopping_function is not None:
211                        answer = self.settings.stopping_function()
212                    else:
213                        answer = 'Stopped'
214                else:
215                    answer = self.settings.working_function(input_data)
216                # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '3')
217            else:
218                # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '4')
219                if stop:
220                    if self.settings.stopping_function is not None:
221                        answer = self.settings.stopping_function(self.data_shelf)
222                    else:
223                        answer = 'Stopped'
224                else:
225                    answer = self.settings.working_function(self.data_shelf, input_data)
226                # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '5')
227        except:
228            # # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '6')
229            # exception = sys.exc_info()
230            # formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2])
231            # exception = exception[:2] + (formatted_traceback,)
232            # answer = (input_data[0], None)
233            # # print(self.settings.process_name)
234            # # print(input_data)
235            # # print(exception)
236            answer = None
237            exception = pickle.dumps(sys.exc_info())
238
239        # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '7')
240        if (answer is not None) or (exception is not None):
241            # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '8')
242            result = (answer, exception)
243
244        # print('<<===', self.settings.process_name, '_parsing_worker_wrapper')
245        return result
246
247    def _subprocess_wrapper(self):
248        try:
249            # input_from_parent_process_queue = self.queue_to_subprocess
250            # output_to_parent_process_queue = self.queue_from_subprocess
251
252            # print(' STARTED:', self.settings.process_name, '; PID:', os.getpid())
253
254            input_from_parent_process_queue = None
255            output_to_parent_process_queue = None
256            if Transport.pipe == self.settings.transport:
257                input_from_parent_process_queue = self.queue_to_subprocess[1]
258                output_to_parent_process_queue = self.queue_from_subprocess[1]
259            elif Transport.queue == self.settings.transport:
260                input_from_parent_process_queue = self.queue_to_subprocess
261                output_to_parent_process_queue = self.queue_from_subprocess
262
263            if self.settings.indicate_subprocess_readyness:
264                result = (True, ('Started', None))
265                result = self._encode_sendable_data(result)
266                if Transport.pipe == self.settings.transport:
267                    output_to_parent_process_queue.send_bytes(result)
268                elif Transport.queue == self.settings.transport:
269                    output_to_parent_process_queue.put(result)
270
271            if self.settings.initiation_function is not None:
272                self.data_shelf = self.settings.initiation_function(self.settings.initialization_data)
273            while True:
274                # print('===>>', self.settings.process_name, '_subprocess_wrapper', '0')
275                # print('===>>', self.settings.process_name, '_subprocess_wrapper', '1', 'qsize:',
276                #       input_from_parent_process_queue.qsize())
277
278                # input_size = input_from_parent_process_queue.qsize()
279                # if input_size > 3:
280                #     print('===>>', self.settings.process_name, 'input_size:', input_size)
281                # output_size = output_to_parent_process_queue.qsize()
282                # if output_size > 3:
283                #     print('===>>', self.settings.process_name, 'output_size:', output_size)
284
285                data = None
286                if self.settings.use_internal_subprocess_input_buffer:
287                    while True:
288                        another_chunk_of_data = None
289                        if Transport.pipe == self.settings.transport:
290                            if input_from_parent_process_queue.poll():
291                                another_chunk_of_data = input_from_parent_process_queue.recv_bytes()
292                            else:
293                                break
294                        elif Transport.queue == self.settings.transport:
295                            if not input_from_parent_process_queue.empty():
296                                another_chunk_of_data = input_from_parent_process_queue.get()
297                            else:
298                                break
299                        
300                        if another_chunk_of_data:
301                            self.list_of_subprocess_input_data.append(another_chunk_of_data)
302
303                    input_size = len(self.list_of_subprocess_input_data)
304                    self.input_size_print_counter += 1
305                    self.input_size_print_sum += input_size
306                    if self.input_size_print_counter >= self.input_size_print_counter_limit:
307                        average_input_size = self.input_size_print_sum / self.input_size_print_counter
308                        self.input_size_print_sum = 0
309                        self.input_size_print_counter = 0
310                        average_input_size_trigger_result = self.input_queue_average_size_trigger.test_trigger(
311                            average_input_size)
312                        if average_input_size_trigger_result is not None:
313                            # if average_input_size_trigger_result:
314                            #     print('===>>', self.settings.process_name, 'average_input_size:', average_input_size)
315                            # else:
316                            #     print('===>>', self.settings.process_name, 'average_input_size is OK:', average_input_size)
317
318                            if self.settings.on_input_queue_is_too_big is not None:
319                                self.settings.on_input_queue_is_too_big(self.data_shelf, average_input_size_trigger_result)
320
321                    # data = input_from_parent_process_queue.get(block=False, timeout=self.settings.subprocess_reading_timeout)
322                    if len(self.list_of_subprocess_input_data) > 0:
323                        data = self.list_of_subprocess_input_data[0]
324                        # print('===>>', self.settings.process_name, 'input_data:', data)
325                        self.list_of_subprocess_input_data = self.list_of_subprocess_input_data[1:]
326                    else:
327                        continue
328                else:
329                    if Transport.pipe == self.settings.transport:
330                        data = input_from_parent_process_queue.recv_bytes()
331                    elif Transport.queue == self.settings.transport:
332                        input_size = input_from_parent_process_queue.qsize()
333                        self.input_size_print_counter += 1
334                        self.input_size_print_sum += input_size
335                        if self.input_size_print_counter >= self.input_size_print_counter_limit:
336                            average_input_size = self.input_size_print_sum / self.input_size_print_counter
337                            self.input_size_print_sum = 0
338                            self.input_size_print_counter = 0
339                            average_input_size_trigger_result = self.input_queue_average_size_trigger.test_trigger(
340                                average_input_size)
341                            if average_input_size_trigger_result is not None:
342                                # if average_input_size_trigger_result:
343                                #     print('===>>', self.settings.process_name, 'average_input_size for Queue:',
344                                #         average_input_size)
345                                # else:
346                                #     print('===>>', self.settings.process_name, 'average_input_size for Queue is OK:',
347                                #         average_input_size)
348                                
349                                if self.settings.on_input_queue_is_too_big is not None:
350                                    self.settings.on_input_queue_is_too_big(self.data_shelf,
351                                                                            average_input_size_trigger_result)
352
353                        try:
354                            data = input_from_parent_process_queue.get(block=True)
355                        except Empty:
356                            pass
357
358                # print('===>>', self.settings.process_name, '_subprocess_wrapper', '2')
359
360                # current_time = time.time()
361                # if (current_time - self.last_log_print_time) > 2:
362                #     print('===>>', self.settings.process_name, '_subprocess_wrapper')
363
364                if data is None:
365                    continue
366
367                is_result_was_send = False
368                is_worker_is_finalized = False
369                is_need_to_break_loop = False
370
371                data = self._decode_sendable_data(data)
372                # data = marshal.loads(data)
373                continue_processing: bool = data[0]
374                if continue_processing:
375                    # print('===>>', self.settings.process_name, '_subprocess_wrapper', '3')
376                    data_with_exception = data[1]
377                    data_only = data_with_exception[0]
378                    result = self._parsing_worker_wrapper(data_only)
379                    if result is not None:
380                        # print('===>>', self.settings.process_name, 'output_result:', result)
381                        # print('===>>', self.settings.process_name, '_subprocess_wrapper', '5')
382                        result = (True, result)
383                        result = self._encode_sendable_data(result)
384                        # result = marshal.dumps(result)
385                        # output_to_parent_process_queue.put(result)
386                        if Transport.pipe == self.settings.transport:
387                            output_to_parent_process_queue.send_bytes(result)
388                        elif Transport.queue == self.settings.transport:
389                            output_to_parent_process_queue.put(result)
390
391                        is_result_was_send = True
392                    # print('===>>', self.settings.process_name, '_subprocess_wrapper', '5')
393                else:
394                    # print('===>>', self.settings.process_name, '_subprocess_wrapper', '6')
395
396                    data_only = (None, None)
397                    self._parsing_worker_wrapper(data_only, stop=True)
398                    if result is not None:
399                        # print('===>>', self.settings.process_name, 'output_result:', result)
400                        # print('===>>', self.settings.process_name, '_subprocess_wrapper', '5')
401                        result = (False, result)
402                        result = self._encode_sendable_data(result)
403                        # result = marshal.dumps(result)
404                        # output_to_parent_process_queue.put(result)
405                        if Transport.pipe == self.settings.transport:
406                            output_to_parent_process_queue.send_bytes(result)
407                        elif Transport.queue == self.settings.transport:
408                            output_to_parent_process_queue.put(result)
409
410                        is_result_was_send = True
411
412                    is_worker_is_finalized = True
413                    is_need_to_break_loop = True
414                    # break
415
416                if continue_processing and (self.settings.on_another_bunch_of_data_was_processed is not None) and is_result_was_send:
417                    self.settings.on_another_bunch_of_data_was_processed(self.data_shelf)
418
419                if (self.settings.on_exit is not None) and is_worker_is_finalized:
420                    self.settings.on_exit(self.data_shelf)
421
422                if is_need_to_break_loop:
423                    break
424
425                # print('<<===', self.settings.process_name, '_subprocess_wrapper')
426
427                # if (current_time - self.last_log_print_time) > 2:
428                #     print('<<===', self.settings.process_name, '_subprocess_wrapper')
429                #     self.last_log_print_time = current_time
430
431            # print(' ENDED:', self.settings.process_name, '; PID:', os.getpid())
432            # print('<<===>>', self.settings.process_name, '_subprocess_wrapper')
433        except BrokenPipeError:
434            pass
435        except OSError:
436            pass
437        except ValueError:
438            pass
439        finally:
440            if Transport.pipe == self.settings.transport:
441                input_from_parent_process_queue = self.queue_to_subprocess[1]
442                output_to_parent_process_queue = self.queue_from_subprocess[1]
443            elif Transport.queue == self.settings.transport:
444                input_from_parent_process_queue = self.queue_to_subprocess
445                output_to_parent_process_queue = self.queue_from_subprocess
446            
447            input_from_parent_process_queue.close()
448            output_to_parent_process_queue.close()
449            
450
451    def start(self, wait_for_process_readyness: bool = True):
452        if not self.subprocess_was_initiated:
453            if self.queue_to_subprocess is None:
454                if Transport.pipe == self.settings.transport:
455                    self.queue_to_subprocess = Pipe()
456                elif Transport.queue == self.settings.transport:
457                    self.queue_to_subprocess = Queue()
458
459            if self.queue_from_subprocess is None:
460                if Transport.pipe == self.settings.transport:
461                    self.queue_from_subprocess = Pipe()
462                elif Transport.queue == self.settings.transport:
463                    self.queue_from_subprocess = Queue()
464
465            target = None
466            arguments = None
467            if self.settings.profile:
468                target = _subprocess_wrapper_profile
469                arguments = (self,)
470            else:
471                target = self._subprocess_wrapper
472                arguments = tuple()
473            self.subprocess = None
474            if self.settings.need_multithreading:
475                self.subprocess = Thread(target=target, args=arguments, daemon=True)
476            else:
477                self.subprocess = Process(target=target, args=arguments, daemon=True)
478            
479            self.subprocess.start()
480            self.subprocess_was_initiated = True
481        
482        if wait_for_process_readyness:
483            self.wait_for_subprocess_readines(block=True)
484    
485    def wait_for_subprocess_readines(self, block: bool = True):
486        if not self.subprocess_was_initiated:
487            raise SubprocessIsNotInitiatedError
488        
489        if self.subprocess_readyness_indicated:
490            return
491
492        if self.settings.indicate_subprocess_readyness:
493            self.subprocess_readyness_indicated = True
494            try:
495                try:
496                    self.get_answer_from_subprocess(block=block)
497                    self.subprocess_readyness_indicated = True
498                except Empty:
499                    raise SubprocessIsNotReadyError
500            except:
501                self.subprocess_readyness_indicated = False
502                raise
503    
504    def _close_connections(self):
505        if self.subprocess_was_initiated:
506            if Transport.pipe == self.settings.transport:
507                self.queue_to_subprocess[0].close()
508                self.queue_from_subprocess[0].close()
509            elif Transport.queue == self.settings.transport:
510                self.queue_to_subprocess.close()
511                self.queue_from_subprocess.close()
512
513    def stop(self):
514        if not self.subprocess_was_initiated:
515            return
516        
517        self.wait_for_subprocess_readines(block=True)
518
519        data = (False, (None, None))
520        data = self._encode_sendable_data(data)
521        # data = marshal.dumps(data)
522
523        output_to_subprocess_queue = None
524        if Transport.pipe == self.settings.transport:
525            output_to_subprocess_queue = self.queue_to_subprocess[0]
526        elif Transport.queue == self.settings.transport:
527            output_to_subprocess_queue = self.queue_to_subprocess
528
529        if Transport.pipe == self.settings.transport:
530            output_to_subprocess_queue.send_bytes(data)
531        elif Transport.queue == self.settings.transport:
532            output_to_subprocess_queue.put(data, timeout=self.settings.subprocess_writing_timeout)
533        
534        try:
535            self.get_answer_from_subprocess(block=True)
536        except Empty:
537            pass
538        except SubprocessTerminatedError:
539            pass
540        finally:
541            self._close_connections()
542            self.subprocess_was_initiated = False
543            self.subprocess.join()
544    
545    def _invalidate(self):
546        self._close_connections()
547        self.subprocess_was_initiated = False
548        self.subprocess.join(self.settings.subprocess_invalidation_timeout)
549
550    def send_data_to_subprocess(self, input_data, block: bool = True):
551        """
552        If (Transport.pipe == self.settings.transport): Very large buffers (approximately 32 MB+, though it depends on
553            the OS) may raise a ValueError exception
554        :param input_data:
555        :return:
556        """
557        if not self.subprocess_was_initiated:
558            raise SubprocessIsNotInitiatedError
559
560        self.wait_for_subprocess_readines(block=block)
561        
562        data = (True, (input_data, None))
563        data = self._encode_sendable_data(data)
564        # data = marshal.dumps(data)
565        need_to_retry = True
566        while need_to_retry:
567            subprocess_disconnected_or_terminated: bool = False
568            try:
569                output_to_subprocess_queue = None
570                if Transport.pipe == self.settings.transport:
571                    output_to_subprocess_queue = self.queue_to_subprocess[0]
572                elif Transport.queue == self.settings.transport:
573                    output_to_subprocess_queue = self.queue_to_subprocess
574
575                if Transport.pipe == self.settings.transport:
576                    output_to_subprocess_queue.send_bytes(data)
577                elif Transport.queue == self.settings.transport:
578                    output_to_subprocess_queue.put(data, timeout=self.settings.subprocess_writing_timeout)
579                
580                need_to_retry = False
581            except OSError:
582                subprocess_disconnected_or_terminated = True
583            except ValueError:
584                subprocess_disconnected_or_terminated = True
585            except Full:
586                need_to_retry = block
587            
588            if subprocess_disconnected_or_terminated:
589                self._invalidate()
590                raise SubprocessTerminatedError
591
592    def is_input_queue_is_empty(self):
593        if not self.subprocess_was_initiated:
594            raise SubprocessIsNotInitiatedError
595
596        self.wait_for_subprocess_readines(block=False)
597        
598        result = None
599        subprocess_disconnected_or_terminated: bool = False
600        try:
601            output_to_subprocess_queue = None
602            if Transport.pipe == self.settings.transport:
603                output_to_subprocess_queue = self.queue_to_subprocess[0]
604            elif Transport.queue == self.settings.transport:
605                output_to_subprocess_queue = self.queue_to_subprocess
606
607            if Transport.pipe == self.settings.transport:
608                result = not output_to_subprocess_queue.poll(timeout=0.0)
609            elif Transport.queue == self.settings.transport:
610                result = output_to_subprocess_queue.empty()
611
612            # result = self.queue_to_subprocess.empty()
613        except OSError:
614            subprocess_disconnected_or_terminated = True
615        except ValueError:
616            subprocess_disconnected_or_terminated = True
617        
618        if subprocess_disconnected_or_terminated:
619            self._invalidate()
620            raise SubprocessTerminatedError
621
622        return result
623
624    def wait_for_data(self, timeout: Optional[Union[float, int]] = None):
625        start_time = perf_counter()
626        while not self.is_input_queue_is_empty():
627            if timeout is not None:
628                if (perf_counter() - start_time) >= timeout:
629                    break
630
631    def get_answer_from_subprocess(self, block=True):
632        """
633        If (Transport.pipe == self.settings.transport): Very large buffers (approximately 32 MB+, though it depends on
634            the OS) may raise a ValueError exception
635        Will raise Empty() in non-blocking mode when queue is empty
636        :param block:
637        :param time_out:  None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds
638        :return:
639        """
640        if not self.subprocess_was_initiated:
641            raise SubprocessIsNotInitiatedError
642
643        self.wait_for_subprocess_readines(block=block)
644        
645        subprocess_continue_working = None
646        answer = None
647        subprocess_disconnected_or_terminated: bool = False
648        try:
649            if Transport.pipe == self.settings.transport:
650                input_from_subprocess_queue = self.queue_from_subprocess[0]
651                if block:
652                    subprocess_answer = input_from_subprocess_queue.recv_bytes()
653                    subprocess_answer = self._decode_sendable_data(subprocess_answer)
654                    # subprocess_answer = marshal.loads(subprocess_answer)
655                    subprocess_continue_working, answer = subprocess_answer
656                else:
657                    if input_from_subprocess_queue.poll(timeout=0.0):
658                        subprocess_answer = input_from_subprocess_queue.recv_bytes()
659                        subprocess_answer = self._decode_sendable_data(subprocess_answer)
660                        # subprocess_answer = marshal.loads(subprocess_answer)
661                        subprocess_continue_working, answer = subprocess_answer
662                    else:
663                        raise Empty()
664            elif Transport.queue == self.settings.transport:
665                input_from_subprocess_queue = self.queue_from_subprocess
666                subprocess_answer = input_from_subprocess_queue.get(block=block, timeout=self.settings.subprocess_reading_timeout)
667                subprocess_answer = self._decode_sendable_data(subprocess_answer)
668                # subprocess_answer = marshal.loads(subprocess_answer)
669                subprocess_continue_working, answer = subprocess_answer
670            
671            if not subprocess_continue_working:
672                subprocess_disconnected_or_terminated = True
673        except OSError:
674            subprocess_disconnected_or_terminated = True
675        except ValueError:
676            subprocess_disconnected_or_terminated = True
677        
678        if subprocess_disconnected_or_terminated:
679            self._invalidate()
680            raise SubprocessTerminatedError
681        
682        exception = answer[1]
683        result = answer[0]
684        if exception is not None:
685            exception = pickle.loads(exception)
686            # print(self.settings.process_name)
687            # print(result)
688            # print(exception)
689            # print()
690            # print(' <<< SUBPROCESS EXCEPTION:')
691            # trace = ''
692            # for line in exception[2]:
693            #     trace += line
694            # print(trace, file=sys.stderr)
695            # print(exception[0])
696            # print(exception[1])
697            # print(' >>>')
698
699            exc_type, exc_value, exc_tb = exception
700            raise exc_value.with_traceback(exc_tb)
701        
702        return result
SubprocessWorker( settings: SubprocessWorkerSettings)
137    def __init__(self, settings: SubprocessWorkerSettings):
138        """ 
139        :param settings: SubprocessWorkerSettings(); you should use copy.copy(SubprocessWorkerSettings(...)) by your 
140            self if you want
141        :return: 
142        """
143        super().__init__()
144        
145        self.settings: SubprocessWorkerSettings = settings
146        self.settings.check()
147
148        self.data_shelf = None
149
150        self.subprocess_was_initiated = False
151        self.subprocess = None
152        self.queue_to_subprocess = self.settings.queue_to_subprocess
153        self.queue_from_subprocess = self.settings.queue_from_subprocess
154
155        self.list_of_subprocess_input_data: List = list()
156        self.input_size_print_sum = 0
157        self.input_size_print_counter = 0
158        self.input_size_print_counter_limit = 500
159        # self.last_log_print_time = time.time()
160        self.subprocess_readyness_indicated: bool = False
161        self.input_queue_average_size_trigger = FrontTriggerableVariable(
162            FrontTriggerableVariableType.bigger_or_equal, self.settings.input_queue_average_size_trigger_limit)

:param settings: SubprocessWorkerSettings(); you should use copy.copy(SubprocessWorkerSettings(...)) by your self if you want :return:

data_shelf
subprocess_was_initiated
subprocess
queue_to_subprocess
queue_from_subprocess
list_of_subprocess_input_data: List
input_size_print_sum
input_size_print_counter
input_size_print_counter_limit
subprocess_readyness_indicated: bool
input_queue_average_size_trigger
def start(self, wait_for_process_readyness: bool = True):
451    def start(self, wait_for_process_readyness: bool = True):
452        if not self.subprocess_was_initiated:
453            if self.queue_to_subprocess is None:
454                if Transport.pipe == self.settings.transport:
455                    self.queue_to_subprocess = Pipe()
456                elif Transport.queue == self.settings.transport:
457                    self.queue_to_subprocess = Queue()
458
459            if self.queue_from_subprocess is None:
460                if Transport.pipe == self.settings.transport:
461                    self.queue_from_subprocess = Pipe()
462                elif Transport.queue == self.settings.transport:
463                    self.queue_from_subprocess = Queue()
464
465            target = None
466            arguments = None
467            if self.settings.profile:
468                target = _subprocess_wrapper_profile
469                arguments = (self,)
470            else:
471                target = self._subprocess_wrapper
472                arguments = tuple()
473            self.subprocess = None
474            if self.settings.need_multithreading:
475                self.subprocess = Thread(target=target, args=arguments, daemon=True)
476            else:
477                self.subprocess = Process(target=target, args=arguments, daemon=True)
478            
479            self.subprocess.start()
480            self.subprocess_was_initiated = True
481        
482        if wait_for_process_readyness:
483            self.wait_for_subprocess_readines(block=True)
def wait_for_subprocess_readines(self, block: bool = True):
485    def wait_for_subprocess_readines(self, block: bool = True):
486        if not self.subprocess_was_initiated:
487            raise SubprocessIsNotInitiatedError
488        
489        if self.subprocess_readyness_indicated:
490            return
491
492        if self.settings.indicate_subprocess_readyness:
493            self.subprocess_readyness_indicated = True
494            try:
495                try:
496                    self.get_answer_from_subprocess(block=block)
497                    self.subprocess_readyness_indicated = True
498                except Empty:
499                    raise SubprocessIsNotReadyError
500            except:
501                self.subprocess_readyness_indicated = False
502                raise
def stop(self):
513    def stop(self):
514        if not self.subprocess_was_initiated:
515            return
516        
517        self.wait_for_subprocess_readines(block=True)
518
519        data = (False, (None, None))
520        data = self._encode_sendable_data(data)
521        # data = marshal.dumps(data)
522
523        output_to_subprocess_queue = None
524        if Transport.pipe == self.settings.transport:
525            output_to_subprocess_queue = self.queue_to_subprocess[0]
526        elif Transport.queue == self.settings.transport:
527            output_to_subprocess_queue = self.queue_to_subprocess
528
529        if Transport.pipe == self.settings.transport:
530            output_to_subprocess_queue.send_bytes(data)
531        elif Transport.queue == self.settings.transport:
532            output_to_subprocess_queue.put(data, timeout=self.settings.subprocess_writing_timeout)
533        
534        try:
535            self.get_answer_from_subprocess(block=True)
536        except Empty:
537            pass
538        except SubprocessTerminatedError:
539            pass
540        finally:
541            self._close_connections()
542            self.subprocess_was_initiated = False
543            self.subprocess.join()
def send_data_to_subprocess(self, input_data, block: bool = True):
550    def send_data_to_subprocess(self, input_data, block: bool = True):
551        """
552        If (Transport.pipe == self.settings.transport): Very large buffers (approximately 32 MB+, though it depends on
553            the OS) may raise a ValueError exception
554        :param input_data:
555        :return:
556        """
557        if not self.subprocess_was_initiated:
558            raise SubprocessIsNotInitiatedError
559
560        self.wait_for_subprocess_readines(block=block)
561        
562        data = (True, (input_data, None))
563        data = self._encode_sendable_data(data)
564        # data = marshal.dumps(data)
565        need_to_retry = True
566        while need_to_retry:
567            subprocess_disconnected_or_terminated: bool = False
568            try:
569                output_to_subprocess_queue = None
570                if Transport.pipe == self.settings.transport:
571                    output_to_subprocess_queue = self.queue_to_subprocess[0]
572                elif Transport.queue == self.settings.transport:
573                    output_to_subprocess_queue = self.queue_to_subprocess
574
575                if Transport.pipe == self.settings.transport:
576                    output_to_subprocess_queue.send_bytes(data)
577                elif Transport.queue == self.settings.transport:
578                    output_to_subprocess_queue.put(data, timeout=self.settings.subprocess_writing_timeout)
579                
580                need_to_retry = False
581            except OSError:
582                subprocess_disconnected_or_terminated = True
583            except ValueError:
584                subprocess_disconnected_or_terminated = True
585            except Full:
586                need_to_retry = block
587            
588            if subprocess_disconnected_or_terminated:
589                self._invalidate()
590                raise SubprocessTerminatedError

If (Transport.pipe == self.settings.transport): Very large buffers (approximately 32 MB+, though it depends on the OS) may raise a ValueError exception :param input_data: :return:

def is_input_queue_is_empty(self):
592    def is_input_queue_is_empty(self):
593        if not self.subprocess_was_initiated:
594            raise SubprocessIsNotInitiatedError
595
596        self.wait_for_subprocess_readines(block=False)
597        
598        result = None
599        subprocess_disconnected_or_terminated: bool = False
600        try:
601            output_to_subprocess_queue = None
602            if Transport.pipe == self.settings.transport:
603                output_to_subprocess_queue = self.queue_to_subprocess[0]
604            elif Transport.queue == self.settings.transport:
605                output_to_subprocess_queue = self.queue_to_subprocess
606
607            if Transport.pipe == self.settings.transport:
608                result = not output_to_subprocess_queue.poll(timeout=0.0)
609            elif Transport.queue == self.settings.transport:
610                result = output_to_subprocess_queue.empty()
611
612            # result = self.queue_to_subprocess.empty()
613        except OSError:
614            subprocess_disconnected_or_terminated = True
615        except ValueError:
616            subprocess_disconnected_or_terminated = True
617        
618        if subprocess_disconnected_or_terminated:
619            self._invalidate()
620            raise SubprocessTerminatedError
621
622        return result
def wait_for_data(self, timeout: Union[int, float, NoneType] = None):
624    def wait_for_data(self, timeout: Optional[Union[float, int]] = None):
625        start_time = perf_counter()
626        while not self.is_input_queue_is_empty():
627            if timeout is not None:
628                if (perf_counter() - start_time) >= timeout:
629                    break
def get_answer_from_subprocess(self, block=True):
631    def get_answer_from_subprocess(self, block=True):
632        """
633        If (Transport.pipe == self.settings.transport): Very large buffers (approximately 32 MB+, though it depends on
634            the OS) may raise a ValueError exception
635        Will raise Empty() in non-blocking mode when queue is empty
636        :param block:
637        :param time_out:  None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds
638        :return:
639        """
640        if not self.subprocess_was_initiated:
641            raise SubprocessIsNotInitiatedError
642
643        self.wait_for_subprocess_readines(block=block)
644        
645        subprocess_continue_working = None
646        answer = None
647        subprocess_disconnected_or_terminated: bool = False
648        try:
649            if Transport.pipe == self.settings.transport:
650                input_from_subprocess_queue = self.queue_from_subprocess[0]
651                if block:
652                    subprocess_answer = input_from_subprocess_queue.recv_bytes()
653                    subprocess_answer = self._decode_sendable_data(subprocess_answer)
654                    # subprocess_answer = marshal.loads(subprocess_answer)
655                    subprocess_continue_working, answer = subprocess_answer
656                else:
657                    if input_from_subprocess_queue.poll(timeout=0.0):
658                        subprocess_answer = input_from_subprocess_queue.recv_bytes()
659                        subprocess_answer = self._decode_sendable_data(subprocess_answer)
660                        # subprocess_answer = marshal.loads(subprocess_answer)
661                        subprocess_continue_working, answer = subprocess_answer
662                    else:
663                        raise Empty()
664            elif Transport.queue == self.settings.transport:
665                input_from_subprocess_queue = self.queue_from_subprocess
666                subprocess_answer = input_from_subprocess_queue.get(block=block, timeout=self.settings.subprocess_reading_timeout)
667                subprocess_answer = self._decode_sendable_data(subprocess_answer)
668                # subprocess_answer = marshal.loads(subprocess_answer)
669                subprocess_continue_working, answer = subprocess_answer
670            
671            if not subprocess_continue_working:
672                subprocess_disconnected_or_terminated = True
673        except OSError:
674            subprocess_disconnected_or_terminated = True
675        except ValueError:
676            subprocess_disconnected_or_terminated = True
677        
678        if subprocess_disconnected_or_terminated:
679            self._invalidate()
680            raise SubprocessTerminatedError
681        
682        exception = answer[1]
683        result = answer[0]
684        if exception is not None:
685            exception = pickle.loads(exception)
686            # print(self.settings.process_name)
687            # print(result)
688            # print(exception)
689            # print()
690            # print(' <<< SUBPROCESS EXCEPTION:')
691            # trace = ''
692            # for line in exception[2]:
693            #     trace += line
694            # print(trace, file=sys.stderr)
695            # print(exception[0])
696            # print(exception[1])
697            # print(' >>>')
698
699            exc_type, exc_value, exc_tb = exception
700            raise exc_value.with_traceback(exc_tb)
701        
702        return result

If (Transport.pipe == self.settings.transport): Very large buffers (approximately 32 MB+, though it depends on the OS) may raise a ValueError exception Will raise Empty() in non-blocking mode when queue is empty :param block: :param time_out: None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds :return:

def _subprocess_wrapper_profile(process_data):
705def _subprocess_wrapper_profile(process_data):
706    printable_name = process_data.settings.process_name.replace(' ', '_') + '.prof'
707    cProfile.runctx('process_data._subprocess_wrapper()', globals(), locals(), printable_name)
class ExternalPipe:
710class ExternalPipe:
711    def __init__(self):
712        self.pipe = Pipe()
713        self.inverted_pipe = (self.pipe[1], self.pipe[0])
pipe
inverted_pipe