cengal.parallel_execution.multiprocess.multiprocessing_task_runner
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19__all__ = ['SubprocessIsNotInitiatedError', 'SubprocessIsNotReadyError', 'SubprocessTerminatedError', 'Empty', 'Full', 'SendableDataType', 'Transport', 'SubprocessWorkerSettings', 'SubprocessWorker', '_subprocess_wrapper_profile', 'ExternalPipe'] 20 21 22import cProfile 23# import multiprocessing 24from multiprocessing import Process, Queue, Pipe 25from threading import Thread 26import sys 27import traceback 28from queue import Empty, Full 29import marshal 30import pickle 31import os 32from cengal.data_manipulation.front_triggerable_variable import FrontTriggerableVariable, FrontTriggerableVariableType 33from cengal.base.classes import BaseClassSettings 34from cengal.time_management.cpu_clock_cycles import perf_counter 35from cengal.introspection.inspect import pdi 36from typing import Callable, Any, Union, Optional, Tuple, List, Dict 37 38# import time 39 40""" 41Module Docstring 42Docstrings: http://www.python.org/dev/peps/pep-0257/ 43""" 44 45__author__ = "ButenkoMS <gtalk@butenkoms.space>" 46__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 47__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 48__license__ = "Apache License, Version 2.0" 49__version__ = "4.4.1" 50__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 51__email__ = "gtalk@butenkoms.space" 52# __status__ = "Prototype" 53__status__ = "Development" 54# __status__ = "Production" 55 56 57class SubprocessIsNotInitiatedError(Exception): 58 pass 59 60 61class SubprocessIsNotReadyError(Exception): 62 pass 63 64 65class SubprocessTerminatedError(Exception): 66 pass 67 68 69class SendableDataType: 70 pickable = 0 71 marshalable = 1 72 custom = 2 73 74 75class Transport: 76 queue = 0 77 pipe = 1 78 tcp = 2 79 80 81class SubprocessWorkerSettings(BaseClassSettings): 82 def __init__(self): 83 self.initiation_function = None # self.data_shelf = self.settings.initiation_function( 84 # self.initialization_data) 85 self.working_function = None # 1) 'answer = self.settings.working_function(input_data)' 86 # if initiation_function is None; 87 # 2) 'answer = self.settings.working_function(self.data_shelf, input_data)' if initiation_function 88 # is not None. 89 self.stopping_function = None 90 91 self.on_input_queue_is_too_big = None # self.on_input_queue_is_too_big(self.data_shelf, 92 # average_input_size_trigger_result) where: average_input_size_trigger_result = 93 # self.input_queue_average_size_trigger.test_trigger(average_input_size) 94 self.on_another_bunch_of_data_was_processed = None # self.on_another_bunch_of_data_was_processed( 95 # self.data_shelf) 96 self.on_exit = None # on process exit; self.on_exit(self.data_shelf) 97 98 self.need_multithreading = False # will use multithread mode if True; and multiprocess else. 99 self.process_name: str = None # str() 100 self.profile = False # will start worker in profiling mode 101 self.initialization_data = None # any pickable Python data 102 self.transport: Transport = None # Transport() 103 self.tcp_settings = None # tcp_link.TCPSettings(). Is used when (self.transport == Transport.tcp) 104 self.use_internal_subprocess_input_buffer = False # will be able to get input data in nonblocking mode 105 self.sendable_data_type: SendableDataType = SendableDataType.pickable 106 self.sendable_data__encoder = None # Will be in use when sendable_data_type == SendableDataType.custom. 107 # Function will encode data in to bytes() 108 self.sendable_data__decoder = None # Will be in use when sendable_data_type == SendableDataType.custom. 109 # Function will decode data from bytes() 110 self.queue_to_subprocess = None # only needed if you want to directly connect this worker with another. 111 # For example to connect output from this worker to input of another worker. 112 self.queue_from_subprocess = None # only needed if you want to directly connect this worker with another. 113 # For example to connect output from this worker to input of another worker. 114 self.input_queue_average_size_trigger_limit = 30 115 self.subprocess_polling_timeout = 0.0 # None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds 116 self.subprocess_reading_timeout = 0.1 # None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds 117 self.subprocess_writing_timeout = 0.1 # None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds 118 self.subprocess_invalidation_timeout = 0.1 # None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds 119 self.indicate_subprocess_readyness: bool = True 120 121 def check(self): 122 if self.working_function is None: 123 raise Exception('working_function cannot be None!') 124 125 if self.transport is None: 126 raise Exception('transport can\'t be None') 127 else: 128 if Transport.tcp == self.transport: 129 if self.tcp_settings is None: 130 raise Exception('tcp_settings can\'t be None while (Transport.tcp == self.transport)') 131 else: 132 self.tcp_settings.check() 133 134 135class SubprocessWorker: 136 def __init__(self, settings: SubprocessWorkerSettings): 137 """ 138 :param settings: SubprocessWorkerSettings(); you should use copy.copy(SubprocessWorkerSettings(...)) by your 139 self if you want 140 :return: 141 """ 142 super().__init__() 143 144 self.settings: SubprocessWorkerSettings = settings 145 self.settings.check() 146 147 self.data_shelf = None 148 149 self.subprocess_was_initiated = False 150 self.subprocess = None 151 self.queue_to_subprocess = self.settings.queue_to_subprocess 152 self.queue_from_subprocess = self.settings.queue_from_subprocess 153 154 self.list_of_subprocess_input_data: List = list() 155 self.input_size_print_sum = 0 156 self.input_size_print_counter = 0 157 self.input_size_print_counter_limit = 500 158 # self.last_log_print_time = time.time() 159 self.subprocess_readyness_indicated: bool = False 160 self.input_queue_average_size_trigger = FrontTriggerableVariable( 161 FrontTriggerableVariableType.bigger_or_equal, self.settings.input_queue_average_size_trigger_limit) 162 163 def _encode_sendable_data(self, data): 164 result = data 165 if Transport.queue == self.settings.transport: 166 if self.settings.sendable_data_type == SendableDataType.pickable: 167 pass 168 elif self.settings.sendable_data_type == SendableDataType.marshalable: 169 result = marshal.dumps(result) 170 elif self.settings.sendable_data_type == SendableDataType.custom: 171 result = self.settings.sendable_data__encoder(result) 172 else: 173 if self.settings.sendable_data_type == SendableDataType.pickable: 174 result = pickle.dumps(result) 175 elif self.settings.sendable_data_type == SendableDataType.marshalable: 176 result = marshal.dumps(result) 177 elif self.settings.sendable_data_type == SendableDataType.custom: 178 result = self.settings.sendable_data__encoder(result) 179 return result 180 181 def _decode_sendable_data(self, data): 182 result = data 183 if Transport.queue == self.settings.transport: 184 if self.settings.sendable_data_type == SendableDataType.pickable: 185 pass 186 elif self.settings.sendable_data_type == SendableDataType.marshalable: 187 result = marshal.loads(result) 188 elif self.settings.sendable_data_type == SendableDataType.custom: 189 result = self.settings.sendable_data__decoder(result) 190 else: 191 if self.settings.sendable_data_type == SendableDataType.pickable: 192 result = pickle.loads(result) 193 elif self.settings.sendable_data_type == SendableDataType.marshalable: 194 result = marshal.loads(result) 195 elif self.settings.sendable_data_type == SendableDataType.custom: 196 result = self.settings.sendable_data__decoder(result) 197 return result 198 199 def _parsing_worker_wrapper(self, input_data, stop=False): 200 # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '0') 201 exception = None 202 answer = None 203 result = None 204 try: 205 # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '1') 206 if self.settings.initiation_function is None: 207 # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '2') 208 if stop: 209 if self.settings.stopping_function is not None: 210 answer = self.settings.stopping_function() 211 else: 212 answer = 'Stopped' 213 else: 214 answer = self.settings.working_function(input_data) 215 # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '3') 216 else: 217 # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '4') 218 if stop: 219 if self.settings.stopping_function is not None: 220 answer = self.settings.stopping_function(self.data_shelf) 221 else: 222 answer = 'Stopped' 223 else: 224 answer = self.settings.working_function(self.data_shelf, input_data) 225 # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '5') 226 except: 227 # # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '6') 228 # exception = sys.exc_info() 229 # formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2]) 230 # exception = exception[:2] + (formatted_traceback,) 231 # answer = (input_data[0], None) 232 # # print(self.settings.process_name) 233 # # print(input_data) 234 # # print(exception) 235 answer = None 236 exception = pickle.dumps(sys.exc_info()) 237 238 # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '7') 239 if (answer is not None) or (exception is not None): 240 # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '8') 241 result = (answer, exception) 242 243 # print('<<===', self.settings.process_name, '_parsing_worker_wrapper') 244 return result 245 246 def _subprocess_wrapper(self): 247 try: 248 # input_from_parent_process_queue = self.queue_to_subprocess 249 # output_to_parent_process_queue = self.queue_from_subprocess 250 251 # print(' STARTED:', self.settings.process_name, '; PID:', os.getpid()) 252 253 input_from_parent_process_queue = None 254 output_to_parent_process_queue = None 255 if Transport.pipe == self.settings.transport: 256 input_from_parent_process_queue = self.queue_to_subprocess[1] 257 output_to_parent_process_queue = self.queue_from_subprocess[1] 258 elif Transport.queue == self.settings.transport: 259 input_from_parent_process_queue = self.queue_to_subprocess 260 output_to_parent_process_queue = self.queue_from_subprocess 261 262 if self.settings.indicate_subprocess_readyness: 263 result = (True, ('Started', None)) 264 result = self._encode_sendable_data(result) 265 if Transport.pipe == self.settings.transport: 266 output_to_parent_process_queue.send_bytes(result) 267 elif Transport.queue == self.settings.transport: 268 output_to_parent_process_queue.put(result) 269 270 if self.settings.initiation_function is not None: 271 self.data_shelf = self.settings.initiation_function(self.settings.initialization_data) 272 while True: 273 # print('===>>', self.settings.process_name, '_subprocess_wrapper', '0') 274 # print('===>>', self.settings.process_name, '_subprocess_wrapper', '1', 'qsize:', 275 # input_from_parent_process_queue.qsize()) 276 277 # input_size = input_from_parent_process_queue.qsize() 278 # if input_size > 3: 279 # print('===>>', self.settings.process_name, 'input_size:', input_size) 280 # output_size = output_to_parent_process_queue.qsize() 281 # if output_size > 3: 282 # print('===>>', self.settings.process_name, 'output_size:', output_size) 283 284 data = None 285 if self.settings.use_internal_subprocess_input_buffer: 286 while True: 287 another_chunk_of_data = None 288 if Transport.pipe == self.settings.transport: 289 if input_from_parent_process_queue.poll(): 290 another_chunk_of_data = input_from_parent_process_queue.recv_bytes() 291 else: 292 break 293 elif Transport.queue == self.settings.transport: 294 if not input_from_parent_process_queue.empty(): 295 another_chunk_of_data = input_from_parent_process_queue.get() 296 else: 297 break 298 299 if another_chunk_of_data: 300 self.list_of_subprocess_input_data.append(another_chunk_of_data) 301 302 input_size = len(self.list_of_subprocess_input_data) 303 self.input_size_print_counter += 1 304 self.input_size_print_sum += input_size 305 if self.input_size_print_counter >= self.input_size_print_counter_limit: 306 average_input_size = self.input_size_print_sum / self.input_size_print_counter 307 self.input_size_print_sum = 0 308 self.input_size_print_counter = 0 309 average_input_size_trigger_result = self.input_queue_average_size_trigger.test_trigger( 310 average_input_size) 311 if average_input_size_trigger_result is not None: 312 # if average_input_size_trigger_result: 313 # print('===>>', self.settings.process_name, 'average_input_size:', average_input_size) 314 # else: 315 # print('===>>', self.settings.process_name, 'average_input_size is OK:', average_input_size) 316 317 if self.settings.on_input_queue_is_too_big is not None: 318 self.settings.on_input_queue_is_too_big(self.data_shelf, average_input_size_trigger_result) 319 320 # data = input_from_parent_process_queue.get(block=False, timeout=self.settings.subprocess_reading_timeout) 321 if len(self.list_of_subprocess_input_data) > 0: 322 data = self.list_of_subprocess_input_data[0] 323 # print('===>>', self.settings.process_name, 'input_data:', data) 324 self.list_of_subprocess_input_data = self.list_of_subprocess_input_data[1:] 325 else: 326 continue 327 else: 328 if Transport.pipe == self.settings.transport: 329 data = input_from_parent_process_queue.recv_bytes() 330 elif Transport.queue == self.settings.transport: 331 input_size = input_from_parent_process_queue.qsize() 332 self.input_size_print_counter += 1 333 self.input_size_print_sum += input_size 334 if self.input_size_print_counter >= self.input_size_print_counter_limit: 335 average_input_size = self.input_size_print_sum / self.input_size_print_counter 336 self.input_size_print_sum = 0 337 self.input_size_print_counter = 0 338 average_input_size_trigger_result = self.input_queue_average_size_trigger.test_trigger( 339 average_input_size) 340 if average_input_size_trigger_result is not None: 341 # if average_input_size_trigger_result: 342 # print('===>>', self.settings.process_name, 'average_input_size for Queue:', 343 # average_input_size) 344 # else: 345 # print('===>>', self.settings.process_name, 'average_input_size for Queue is OK:', 346 # average_input_size) 347 348 if self.settings.on_input_queue_is_too_big is not None: 349 self.settings.on_input_queue_is_too_big(self.data_shelf, 350 average_input_size_trigger_result) 351 352 try: 353 data = input_from_parent_process_queue.get(block=True) 354 except Empty: 355 pass 356 357 # print('===>>', self.settings.process_name, '_subprocess_wrapper', '2') 358 359 # current_time = time.time() 360 # if (current_time - self.last_log_print_time) > 2: 361 # print('===>>', self.settings.process_name, '_subprocess_wrapper') 362 363 if data is None: 364 continue 365 366 is_result_was_send = False 367 is_worker_is_finalized = False 368 is_need_to_break_loop = False 369 370 data = self._decode_sendable_data(data) 371 # data = marshal.loads(data) 372 continue_processing: bool = data[0] 373 if continue_processing: 374 # print('===>>', self.settings.process_name, '_subprocess_wrapper', '3') 375 data_with_exception = data[1] 376 data_only = data_with_exception[0] 377 result = self._parsing_worker_wrapper(data_only) 378 if result is not None: 379 # print('===>>', self.settings.process_name, 'output_result:', result) 380 # print('===>>', self.settings.process_name, '_subprocess_wrapper', '5') 381 result = (True, result) 382 result = self._encode_sendable_data(result) 383 # result = marshal.dumps(result) 384 # output_to_parent_process_queue.put(result) 385 if Transport.pipe == self.settings.transport: 386 output_to_parent_process_queue.send_bytes(result) 387 elif Transport.queue == self.settings.transport: 388 output_to_parent_process_queue.put(result) 389 390 is_result_was_send = True 391 # print('===>>', self.settings.process_name, '_subprocess_wrapper', '5') 392 else: 393 # print('===>>', self.settings.process_name, '_subprocess_wrapper', '6') 394 395 data_only = (None, None) 396 self._parsing_worker_wrapper(data_only, stop=True) 397 if result is not None: 398 # print('===>>', self.settings.process_name, 'output_result:', result) 399 # print('===>>', self.settings.process_name, '_subprocess_wrapper', '5') 400 result = (False, result) 401 result = self._encode_sendable_data(result) 402 # result = marshal.dumps(result) 403 # output_to_parent_process_queue.put(result) 404 if Transport.pipe == self.settings.transport: 405 output_to_parent_process_queue.send_bytes(result) 406 elif Transport.queue == self.settings.transport: 407 output_to_parent_process_queue.put(result) 408 409 is_result_was_send = True 410 411 is_worker_is_finalized = True 412 is_need_to_break_loop = True 413 # break 414 415 if continue_processing and (self.settings.on_another_bunch_of_data_was_processed is not None) and is_result_was_send: 416 self.settings.on_another_bunch_of_data_was_processed(self.data_shelf) 417 418 if (self.settings.on_exit is not None) and is_worker_is_finalized: 419 self.settings.on_exit(self.data_shelf) 420 421 if is_need_to_break_loop: 422 break 423 424 # print('<<===', self.settings.process_name, '_subprocess_wrapper') 425 426 # if (current_time - self.last_log_print_time) > 2: 427 # print('<<===', self.settings.process_name, '_subprocess_wrapper') 428 # self.last_log_print_time = current_time 429 430 # print(' ENDED:', self.settings.process_name, '; PID:', os.getpid()) 431 # print('<<===>>', self.settings.process_name, '_subprocess_wrapper') 432 except BrokenPipeError: 433 pass 434 except OSError: 435 pass 436 except ValueError: 437 pass 438 finally: 439 if Transport.pipe == self.settings.transport: 440 input_from_parent_process_queue = self.queue_to_subprocess[1] 441 output_to_parent_process_queue = self.queue_from_subprocess[1] 442 elif Transport.queue == self.settings.transport: 443 input_from_parent_process_queue = self.queue_to_subprocess 444 output_to_parent_process_queue = self.queue_from_subprocess 445 446 input_from_parent_process_queue.close() 447 output_to_parent_process_queue.close() 448 449 450 def start(self, wait_for_process_readyness: bool = True): 451 if not self.subprocess_was_initiated: 452 if self.queue_to_subprocess is None: 453 if Transport.pipe == self.settings.transport: 454 self.queue_to_subprocess = Pipe() 455 elif Transport.queue == self.settings.transport: 456 self.queue_to_subprocess = Queue() 457 458 if self.queue_from_subprocess is None: 459 if Transport.pipe == self.settings.transport: 460 self.queue_from_subprocess = Pipe() 461 elif Transport.queue == self.settings.transport: 462 self.queue_from_subprocess = Queue() 463 464 target = None 465 arguments = None 466 if self.settings.profile: 467 target = _subprocess_wrapper_profile 468 arguments = (self,) 469 else: 470 target = self._subprocess_wrapper 471 arguments = tuple() 472 self.subprocess = None 473 if self.settings.need_multithreading: 474 self.subprocess = Thread(target=target, args=arguments, daemon=True) 475 else: 476 self.subprocess = Process(target=target, args=arguments, daemon=True) 477 478 self.subprocess.start() 479 self.subprocess_was_initiated = True 480 481 if wait_for_process_readyness: 482 self.wait_for_subprocess_readines(block=True) 483 484 def wait_for_subprocess_readines(self, block: bool = True): 485 if not self.subprocess_was_initiated: 486 raise SubprocessIsNotInitiatedError 487 488 if self.subprocess_readyness_indicated: 489 return 490 491 if self.settings.indicate_subprocess_readyness: 492 self.subprocess_readyness_indicated = True 493 try: 494 try: 495 self.get_answer_from_subprocess(block=block) 496 self.subprocess_readyness_indicated = True 497 except Empty: 498 raise SubprocessIsNotReadyError 499 except: 500 self.subprocess_readyness_indicated = False 501 raise 502 503 def _close_connections(self): 504 if self.subprocess_was_initiated: 505 if Transport.pipe == self.settings.transport: 506 self.queue_to_subprocess[0].close() 507 self.queue_from_subprocess[0].close() 508 elif Transport.queue == self.settings.transport: 509 self.queue_to_subprocess.close() 510 self.queue_from_subprocess.close() 511 512 def stop(self): 513 if not self.subprocess_was_initiated: 514 return 515 516 self.wait_for_subprocess_readines(block=True) 517 518 data = (False, (None, None)) 519 data = self._encode_sendable_data(data) 520 # data = marshal.dumps(data) 521 522 output_to_subprocess_queue = None 523 if Transport.pipe == self.settings.transport: 524 output_to_subprocess_queue = self.queue_to_subprocess[0] 525 elif Transport.queue == self.settings.transport: 526 output_to_subprocess_queue = self.queue_to_subprocess 527 528 if Transport.pipe == self.settings.transport: 529 output_to_subprocess_queue.send_bytes(data) 530 elif Transport.queue == self.settings.transport: 531 output_to_subprocess_queue.put(data, timeout=self.settings.subprocess_writing_timeout) 532 533 try: 534 self.get_answer_from_subprocess(block=True) 535 except Empty: 536 pass 537 except SubprocessTerminatedError: 538 pass 539 finally: 540 self._close_connections() 541 self.subprocess_was_initiated = False 542 self.subprocess.join() 543 544 def _invalidate(self): 545 self._close_connections() 546 self.subprocess_was_initiated = False 547 self.subprocess.join(self.settings.subprocess_invalidation_timeout) 548 549 def send_data_to_subprocess(self, input_data, block: bool = True): 550 """ 551 If (Transport.pipe == self.settings.transport): Very large buffers (approximately 32 MB+, though it depends on 552 the OS) may raise a ValueError exception 553 :param input_data: 554 :return: 555 """ 556 if not self.subprocess_was_initiated: 557 raise SubprocessIsNotInitiatedError 558 559 self.wait_for_subprocess_readines(block=block) 560 561 data = (True, (input_data, None)) 562 data = self._encode_sendable_data(data) 563 # data = marshal.dumps(data) 564 need_to_retry = True 565 while need_to_retry: 566 subprocess_disconnected_or_terminated: bool = False 567 try: 568 output_to_subprocess_queue = None 569 if Transport.pipe == self.settings.transport: 570 output_to_subprocess_queue = self.queue_to_subprocess[0] 571 elif Transport.queue == self.settings.transport: 572 output_to_subprocess_queue = self.queue_to_subprocess 573 574 if Transport.pipe == self.settings.transport: 575 output_to_subprocess_queue.send_bytes(data) 576 elif Transport.queue == self.settings.transport: 577 output_to_subprocess_queue.put(data, timeout=self.settings.subprocess_writing_timeout) 578 579 need_to_retry = False 580 except OSError: 581 subprocess_disconnected_or_terminated = True 582 except ValueError: 583 subprocess_disconnected_or_terminated = True 584 except Full: 585 need_to_retry = block 586 587 if subprocess_disconnected_or_terminated: 588 self._invalidate() 589 raise SubprocessTerminatedError 590 591 def is_input_queue_is_empty(self): 592 if not self.subprocess_was_initiated: 593 raise SubprocessIsNotInitiatedError 594 595 self.wait_for_subprocess_readines(block=False) 596 597 result = None 598 subprocess_disconnected_or_terminated: bool = False 599 try: 600 output_to_subprocess_queue = None 601 if Transport.pipe == self.settings.transport: 602 output_to_subprocess_queue = self.queue_to_subprocess[0] 603 elif Transport.queue == self.settings.transport: 604 output_to_subprocess_queue = self.queue_to_subprocess 605 606 if Transport.pipe == self.settings.transport: 607 result = not output_to_subprocess_queue.poll(timeout=0.0) 608 elif Transport.queue == self.settings.transport: 609 result = output_to_subprocess_queue.empty() 610 611 # result = self.queue_to_subprocess.empty() 612 except OSError: 613 subprocess_disconnected_or_terminated = True 614 except ValueError: 615 subprocess_disconnected_or_terminated = True 616 617 if subprocess_disconnected_or_terminated: 618 self._invalidate() 619 raise SubprocessTerminatedError 620 621 return result 622 623 def wait_for_data(self, timeout: Optional[Union[float, int]] = None): 624 start_time = perf_counter() 625 while not self.is_input_queue_is_empty(): 626 if timeout is not None: 627 if (perf_counter() - start_time) >= timeout: 628 break 629 630 def get_answer_from_subprocess(self, block=True): 631 """ 632 If (Transport.pipe == self.settings.transport): Very large buffers (approximately 32 MB+, though it depends on 633 the OS) may raise a ValueError exception 634 Will raise Empty() in non-blocking mode when queue is empty 635 :param block: 636 :param time_out: None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds 637 :return: 638 """ 639 if not self.subprocess_was_initiated: 640 raise SubprocessIsNotInitiatedError 641 642 self.wait_for_subprocess_readines(block=block) 643 644 subprocess_continue_working = None 645 answer = None 646 subprocess_disconnected_or_terminated: bool = False 647 try: 648 if Transport.pipe == self.settings.transport: 649 input_from_subprocess_queue = self.queue_from_subprocess[0] 650 if block: 651 subprocess_answer = input_from_subprocess_queue.recv_bytes() 652 subprocess_answer = self._decode_sendable_data(subprocess_answer) 653 # subprocess_answer = marshal.loads(subprocess_answer) 654 subprocess_continue_working, answer = subprocess_answer 655 else: 656 if input_from_subprocess_queue.poll(timeout=0.0): 657 subprocess_answer = input_from_subprocess_queue.recv_bytes() 658 subprocess_answer = self._decode_sendable_data(subprocess_answer) 659 # subprocess_answer = marshal.loads(subprocess_answer) 660 subprocess_continue_working, answer = subprocess_answer 661 else: 662 raise Empty() 663 elif Transport.queue == self.settings.transport: 664 input_from_subprocess_queue = self.queue_from_subprocess 665 subprocess_answer = input_from_subprocess_queue.get(block=block, timeout=self.settings.subprocess_reading_timeout) 666 subprocess_answer = self._decode_sendable_data(subprocess_answer) 667 # subprocess_answer = marshal.loads(subprocess_answer) 668 subprocess_continue_working, answer = subprocess_answer 669 670 if not subprocess_continue_working: 671 subprocess_disconnected_or_terminated = True 672 except OSError: 673 subprocess_disconnected_or_terminated = True 674 except ValueError: 675 subprocess_disconnected_or_terminated = True 676 677 if subprocess_disconnected_or_terminated: 678 self._invalidate() 679 raise SubprocessTerminatedError 680 681 exception = answer[1] 682 result = answer[0] 683 if exception is not None: 684 exception = pickle.loads(exception) 685 # print(self.settings.process_name) 686 # print(result) 687 # print(exception) 688 # print() 689 # print(' <<< SUBPROCESS EXCEPTION:') 690 # trace = '' 691 # for line in exception[2]: 692 # trace += line 693 # print(trace, file=sys.stderr) 694 # print(exception[0]) 695 # print(exception[1]) 696 # print(' >>>') 697 698 exc_type, exc_value, exc_tb = exception 699 raise exc_value.with_traceback(exc_tb) 700 701 return result 702 703 704def _subprocess_wrapper_profile(process_data): 705 printable_name = process_data.settings.process_name.replace(' ', '_') + '.prof' 706 cProfile.runctx('process_data._subprocess_wrapper()', globals(), locals(), printable_name) 707 708 709class ExternalPipe: 710 def __init__(self): 711 self.pipe = Pipe() 712 self.inverted_pipe = (self.pipe[1], self.pipe[0])
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Exception raised by Queue.get(block=0)/get_nowait().
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Exception raised by Queue.put(block=0)/put_nowait().
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
82class SubprocessWorkerSettings(BaseClassSettings): 83 def __init__(self): 84 self.initiation_function = None # self.data_shelf = self.settings.initiation_function( 85 # self.initialization_data) 86 self.working_function = None # 1) 'answer = self.settings.working_function(input_data)' 87 # if initiation_function is None; 88 # 2) 'answer = self.settings.working_function(self.data_shelf, input_data)' if initiation_function 89 # is not None. 90 self.stopping_function = None 91 92 self.on_input_queue_is_too_big = None # self.on_input_queue_is_too_big(self.data_shelf, 93 # average_input_size_trigger_result) where: average_input_size_trigger_result = 94 # self.input_queue_average_size_trigger.test_trigger(average_input_size) 95 self.on_another_bunch_of_data_was_processed = None # self.on_another_bunch_of_data_was_processed( 96 # self.data_shelf) 97 self.on_exit = None # on process exit; self.on_exit(self.data_shelf) 98 99 self.need_multithreading = False # will use multithread mode if True; and multiprocess else. 100 self.process_name: str = None # str() 101 self.profile = False # will start worker in profiling mode 102 self.initialization_data = None # any pickable Python data 103 self.transport: Transport = None # Transport() 104 self.tcp_settings = None # tcp_link.TCPSettings(). Is used when (self.transport == Transport.tcp) 105 self.use_internal_subprocess_input_buffer = False # will be able to get input data in nonblocking mode 106 self.sendable_data_type: SendableDataType = SendableDataType.pickable 107 self.sendable_data__encoder = None # Will be in use when sendable_data_type == SendableDataType.custom. 108 # Function will encode data in to bytes() 109 self.sendable_data__decoder = None # Will be in use when sendable_data_type == SendableDataType.custom. 110 # Function will decode data from bytes() 111 self.queue_to_subprocess = None # only needed if you want to directly connect this worker with another. 112 # For example to connect output from this worker to input of another worker. 113 self.queue_from_subprocess = None # only needed if you want to directly connect this worker with another. 114 # For example to connect output from this worker to input of another worker. 115 self.input_queue_average_size_trigger_limit = 30 116 self.subprocess_polling_timeout = 0.0 # None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds 117 self.subprocess_reading_timeout = 0.1 # None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds 118 self.subprocess_writing_timeout = 0.1 # None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds 119 self.subprocess_invalidation_timeout = 0.1 # None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds 120 self.indicate_subprocess_readyness: bool = True 121 122 def check(self): 123 if self.working_function is None: 124 raise Exception('working_function cannot be None!') 125 126 if self.transport is None: 127 raise Exception('transport can\'t be None') 128 else: 129 if Transport.tcp == self.transport: 130 if self.tcp_settings is None: 131 raise Exception('tcp_settings can\'t be None while (Transport.tcp == self.transport)') 132 else: 133 self.tcp_settings.check()
122 def check(self): 123 if self.working_function is None: 124 raise Exception('working_function cannot be None!') 125 126 if self.transport is None: 127 raise Exception('transport can\'t be None') 128 else: 129 if Transport.tcp == self.transport: 130 if self.tcp_settings is None: 131 raise Exception('tcp_settings can\'t be None while (Transport.tcp == self.transport)') 132 else: 133 self.tcp_settings.check()
136class SubprocessWorker: 137 def __init__(self, settings: SubprocessWorkerSettings): 138 """ 139 :param settings: SubprocessWorkerSettings(); you should use copy.copy(SubprocessWorkerSettings(...)) by your 140 self if you want 141 :return: 142 """ 143 super().__init__() 144 145 self.settings: SubprocessWorkerSettings = settings 146 self.settings.check() 147 148 self.data_shelf = None 149 150 self.subprocess_was_initiated = False 151 self.subprocess = None 152 self.queue_to_subprocess = self.settings.queue_to_subprocess 153 self.queue_from_subprocess = self.settings.queue_from_subprocess 154 155 self.list_of_subprocess_input_data: List = list() 156 self.input_size_print_sum = 0 157 self.input_size_print_counter = 0 158 self.input_size_print_counter_limit = 500 159 # self.last_log_print_time = time.time() 160 self.subprocess_readyness_indicated: bool = False 161 self.input_queue_average_size_trigger = FrontTriggerableVariable( 162 FrontTriggerableVariableType.bigger_or_equal, self.settings.input_queue_average_size_trigger_limit) 163 164 def _encode_sendable_data(self, data): 165 result = data 166 if Transport.queue == self.settings.transport: 167 if self.settings.sendable_data_type == SendableDataType.pickable: 168 pass 169 elif self.settings.sendable_data_type == SendableDataType.marshalable: 170 result = marshal.dumps(result) 171 elif self.settings.sendable_data_type == SendableDataType.custom: 172 result = self.settings.sendable_data__encoder(result) 173 else: 174 if self.settings.sendable_data_type == SendableDataType.pickable: 175 result = pickle.dumps(result) 176 elif self.settings.sendable_data_type == SendableDataType.marshalable: 177 result = marshal.dumps(result) 178 elif self.settings.sendable_data_type == SendableDataType.custom: 179 result = self.settings.sendable_data__encoder(result) 180 return result 181 182 def _decode_sendable_data(self, data): 183 result = data 184 if Transport.queue == self.settings.transport: 185 if self.settings.sendable_data_type == SendableDataType.pickable: 186 pass 187 elif self.settings.sendable_data_type == SendableDataType.marshalable: 188 result = marshal.loads(result) 189 elif self.settings.sendable_data_type == SendableDataType.custom: 190 result = self.settings.sendable_data__decoder(result) 191 else: 192 if self.settings.sendable_data_type == SendableDataType.pickable: 193 result = pickle.loads(result) 194 elif self.settings.sendable_data_type == SendableDataType.marshalable: 195 result = marshal.loads(result) 196 elif self.settings.sendable_data_type == SendableDataType.custom: 197 result = self.settings.sendable_data__decoder(result) 198 return result 199 200 def _parsing_worker_wrapper(self, input_data, stop=False): 201 # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '0') 202 exception = None 203 answer = None 204 result = None 205 try: 206 # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '1') 207 if self.settings.initiation_function is None: 208 # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '2') 209 if stop: 210 if self.settings.stopping_function is not None: 211 answer = self.settings.stopping_function() 212 else: 213 answer = 'Stopped' 214 else: 215 answer = self.settings.working_function(input_data) 216 # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '3') 217 else: 218 # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '4') 219 if stop: 220 if self.settings.stopping_function is not None: 221 answer = self.settings.stopping_function(self.data_shelf) 222 else: 223 answer = 'Stopped' 224 else: 225 answer = self.settings.working_function(self.data_shelf, input_data) 226 # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '5') 227 except: 228 # # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '6') 229 # exception = sys.exc_info() 230 # formatted_traceback = traceback.format_exception(exception[0], exception[1], exception[2]) 231 # exception = exception[:2] + (formatted_traceback,) 232 # answer = (input_data[0], None) 233 # # print(self.settings.process_name) 234 # # print(input_data) 235 # # print(exception) 236 answer = None 237 exception = pickle.dumps(sys.exc_info()) 238 239 # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '7') 240 if (answer is not None) or (exception is not None): 241 # print('===>>', self.settings.process_name, '_parsing_worker_wrapper', '8') 242 result = (answer, exception) 243 244 # print('<<===', self.settings.process_name, '_parsing_worker_wrapper') 245 return result 246 247 def _subprocess_wrapper(self): 248 try: 249 # input_from_parent_process_queue = self.queue_to_subprocess 250 # output_to_parent_process_queue = self.queue_from_subprocess 251 252 # print(' STARTED:', self.settings.process_name, '; PID:', os.getpid()) 253 254 input_from_parent_process_queue = None 255 output_to_parent_process_queue = None 256 if Transport.pipe == self.settings.transport: 257 input_from_parent_process_queue = self.queue_to_subprocess[1] 258 output_to_parent_process_queue = self.queue_from_subprocess[1] 259 elif Transport.queue == self.settings.transport: 260 input_from_parent_process_queue = self.queue_to_subprocess 261 output_to_parent_process_queue = self.queue_from_subprocess 262 263 if self.settings.indicate_subprocess_readyness: 264 result = (True, ('Started', None)) 265 result = self._encode_sendable_data(result) 266 if Transport.pipe == self.settings.transport: 267 output_to_parent_process_queue.send_bytes(result) 268 elif Transport.queue == self.settings.transport: 269 output_to_parent_process_queue.put(result) 270 271 if self.settings.initiation_function is not None: 272 self.data_shelf = self.settings.initiation_function(self.settings.initialization_data) 273 while True: 274 # print('===>>', self.settings.process_name, '_subprocess_wrapper', '0') 275 # print('===>>', self.settings.process_name, '_subprocess_wrapper', '1', 'qsize:', 276 # input_from_parent_process_queue.qsize()) 277 278 # input_size = input_from_parent_process_queue.qsize() 279 # if input_size > 3: 280 # print('===>>', self.settings.process_name, 'input_size:', input_size) 281 # output_size = output_to_parent_process_queue.qsize() 282 # if output_size > 3: 283 # print('===>>', self.settings.process_name, 'output_size:', output_size) 284 285 data = None 286 if self.settings.use_internal_subprocess_input_buffer: 287 while True: 288 another_chunk_of_data = None 289 if Transport.pipe == self.settings.transport: 290 if input_from_parent_process_queue.poll(): 291 another_chunk_of_data = input_from_parent_process_queue.recv_bytes() 292 else: 293 break 294 elif Transport.queue == self.settings.transport: 295 if not input_from_parent_process_queue.empty(): 296 another_chunk_of_data = input_from_parent_process_queue.get() 297 else: 298 break 299 300 if another_chunk_of_data: 301 self.list_of_subprocess_input_data.append(another_chunk_of_data) 302 303 input_size = len(self.list_of_subprocess_input_data) 304 self.input_size_print_counter += 1 305 self.input_size_print_sum += input_size 306 if self.input_size_print_counter >= self.input_size_print_counter_limit: 307 average_input_size = self.input_size_print_sum / self.input_size_print_counter 308 self.input_size_print_sum = 0 309 self.input_size_print_counter = 0 310 average_input_size_trigger_result = self.input_queue_average_size_trigger.test_trigger( 311 average_input_size) 312 if average_input_size_trigger_result is not None: 313 # if average_input_size_trigger_result: 314 # print('===>>', self.settings.process_name, 'average_input_size:', average_input_size) 315 # else: 316 # print('===>>', self.settings.process_name, 'average_input_size is OK:', average_input_size) 317 318 if self.settings.on_input_queue_is_too_big is not None: 319 self.settings.on_input_queue_is_too_big(self.data_shelf, average_input_size_trigger_result) 320 321 # data = input_from_parent_process_queue.get(block=False, timeout=self.settings.subprocess_reading_timeout) 322 if len(self.list_of_subprocess_input_data) > 0: 323 data = self.list_of_subprocess_input_data[0] 324 # print('===>>', self.settings.process_name, 'input_data:', data) 325 self.list_of_subprocess_input_data = self.list_of_subprocess_input_data[1:] 326 else: 327 continue 328 else: 329 if Transport.pipe == self.settings.transport: 330 data = input_from_parent_process_queue.recv_bytes() 331 elif Transport.queue == self.settings.transport: 332 input_size = input_from_parent_process_queue.qsize() 333 self.input_size_print_counter += 1 334 self.input_size_print_sum += input_size 335 if self.input_size_print_counter >= self.input_size_print_counter_limit: 336 average_input_size = self.input_size_print_sum / self.input_size_print_counter 337 self.input_size_print_sum = 0 338 self.input_size_print_counter = 0 339 average_input_size_trigger_result = self.input_queue_average_size_trigger.test_trigger( 340 average_input_size) 341 if average_input_size_trigger_result is not None: 342 # if average_input_size_trigger_result: 343 # print('===>>', self.settings.process_name, 'average_input_size for Queue:', 344 # average_input_size) 345 # else: 346 # print('===>>', self.settings.process_name, 'average_input_size for Queue is OK:', 347 # average_input_size) 348 349 if self.settings.on_input_queue_is_too_big is not None: 350 self.settings.on_input_queue_is_too_big(self.data_shelf, 351 average_input_size_trigger_result) 352 353 try: 354 data = input_from_parent_process_queue.get(block=True) 355 except Empty: 356 pass 357 358 # print('===>>', self.settings.process_name, '_subprocess_wrapper', '2') 359 360 # current_time = time.time() 361 # if (current_time - self.last_log_print_time) > 2: 362 # print('===>>', self.settings.process_name, '_subprocess_wrapper') 363 364 if data is None: 365 continue 366 367 is_result_was_send = False 368 is_worker_is_finalized = False 369 is_need_to_break_loop = False 370 371 data = self._decode_sendable_data(data) 372 # data = marshal.loads(data) 373 continue_processing: bool = data[0] 374 if continue_processing: 375 # print('===>>', self.settings.process_name, '_subprocess_wrapper', '3') 376 data_with_exception = data[1] 377 data_only = data_with_exception[0] 378 result = self._parsing_worker_wrapper(data_only) 379 if result is not None: 380 # print('===>>', self.settings.process_name, 'output_result:', result) 381 # print('===>>', self.settings.process_name, '_subprocess_wrapper', '5') 382 result = (True, result) 383 result = self._encode_sendable_data(result) 384 # result = marshal.dumps(result) 385 # output_to_parent_process_queue.put(result) 386 if Transport.pipe == self.settings.transport: 387 output_to_parent_process_queue.send_bytes(result) 388 elif Transport.queue == self.settings.transport: 389 output_to_parent_process_queue.put(result) 390 391 is_result_was_send = True 392 # print('===>>', self.settings.process_name, '_subprocess_wrapper', '5') 393 else: 394 # print('===>>', self.settings.process_name, '_subprocess_wrapper', '6') 395 396 data_only = (None, None) 397 self._parsing_worker_wrapper(data_only, stop=True) 398 if result is not None: 399 # print('===>>', self.settings.process_name, 'output_result:', result) 400 # print('===>>', self.settings.process_name, '_subprocess_wrapper', '5') 401 result = (False, result) 402 result = self._encode_sendable_data(result) 403 # result = marshal.dumps(result) 404 # output_to_parent_process_queue.put(result) 405 if Transport.pipe == self.settings.transport: 406 output_to_parent_process_queue.send_bytes(result) 407 elif Transport.queue == self.settings.transport: 408 output_to_parent_process_queue.put(result) 409 410 is_result_was_send = True 411 412 is_worker_is_finalized = True 413 is_need_to_break_loop = True 414 # break 415 416 if continue_processing and (self.settings.on_another_bunch_of_data_was_processed is not None) and is_result_was_send: 417 self.settings.on_another_bunch_of_data_was_processed(self.data_shelf) 418 419 if (self.settings.on_exit is not None) and is_worker_is_finalized: 420 self.settings.on_exit(self.data_shelf) 421 422 if is_need_to_break_loop: 423 break 424 425 # print('<<===', self.settings.process_name, '_subprocess_wrapper') 426 427 # if (current_time - self.last_log_print_time) > 2: 428 # print('<<===', self.settings.process_name, '_subprocess_wrapper') 429 # self.last_log_print_time = current_time 430 431 # print(' ENDED:', self.settings.process_name, '; PID:', os.getpid()) 432 # print('<<===>>', self.settings.process_name, '_subprocess_wrapper') 433 except BrokenPipeError: 434 pass 435 except OSError: 436 pass 437 except ValueError: 438 pass 439 finally: 440 if Transport.pipe == self.settings.transport: 441 input_from_parent_process_queue = self.queue_to_subprocess[1] 442 output_to_parent_process_queue = self.queue_from_subprocess[1] 443 elif Transport.queue == self.settings.transport: 444 input_from_parent_process_queue = self.queue_to_subprocess 445 output_to_parent_process_queue = self.queue_from_subprocess 446 447 input_from_parent_process_queue.close() 448 output_to_parent_process_queue.close() 449 450 451 def start(self, wait_for_process_readyness: bool = True): 452 if not self.subprocess_was_initiated: 453 if self.queue_to_subprocess is None: 454 if Transport.pipe == self.settings.transport: 455 self.queue_to_subprocess = Pipe() 456 elif Transport.queue == self.settings.transport: 457 self.queue_to_subprocess = Queue() 458 459 if self.queue_from_subprocess is None: 460 if Transport.pipe == self.settings.transport: 461 self.queue_from_subprocess = Pipe() 462 elif Transport.queue == self.settings.transport: 463 self.queue_from_subprocess = Queue() 464 465 target = None 466 arguments = None 467 if self.settings.profile: 468 target = _subprocess_wrapper_profile 469 arguments = (self,) 470 else: 471 target = self._subprocess_wrapper 472 arguments = tuple() 473 self.subprocess = None 474 if self.settings.need_multithreading: 475 self.subprocess = Thread(target=target, args=arguments, daemon=True) 476 else: 477 self.subprocess = Process(target=target, args=arguments, daemon=True) 478 479 self.subprocess.start() 480 self.subprocess_was_initiated = True 481 482 if wait_for_process_readyness: 483 self.wait_for_subprocess_readines(block=True) 484 485 def wait_for_subprocess_readines(self, block: bool = True): 486 if not self.subprocess_was_initiated: 487 raise SubprocessIsNotInitiatedError 488 489 if self.subprocess_readyness_indicated: 490 return 491 492 if self.settings.indicate_subprocess_readyness: 493 self.subprocess_readyness_indicated = True 494 try: 495 try: 496 self.get_answer_from_subprocess(block=block) 497 self.subprocess_readyness_indicated = True 498 except Empty: 499 raise SubprocessIsNotReadyError 500 except: 501 self.subprocess_readyness_indicated = False 502 raise 503 504 def _close_connections(self): 505 if self.subprocess_was_initiated: 506 if Transport.pipe == self.settings.transport: 507 self.queue_to_subprocess[0].close() 508 self.queue_from_subprocess[0].close() 509 elif Transport.queue == self.settings.transport: 510 self.queue_to_subprocess.close() 511 self.queue_from_subprocess.close() 512 513 def stop(self): 514 if not self.subprocess_was_initiated: 515 return 516 517 self.wait_for_subprocess_readines(block=True) 518 519 data = (False, (None, None)) 520 data = self._encode_sendable_data(data) 521 # data = marshal.dumps(data) 522 523 output_to_subprocess_queue = None 524 if Transport.pipe == self.settings.transport: 525 output_to_subprocess_queue = self.queue_to_subprocess[0] 526 elif Transport.queue == self.settings.transport: 527 output_to_subprocess_queue = self.queue_to_subprocess 528 529 if Transport.pipe == self.settings.transport: 530 output_to_subprocess_queue.send_bytes(data) 531 elif Transport.queue == self.settings.transport: 532 output_to_subprocess_queue.put(data, timeout=self.settings.subprocess_writing_timeout) 533 534 try: 535 self.get_answer_from_subprocess(block=True) 536 except Empty: 537 pass 538 except SubprocessTerminatedError: 539 pass 540 finally: 541 self._close_connections() 542 self.subprocess_was_initiated = False 543 self.subprocess.join() 544 545 def _invalidate(self): 546 self._close_connections() 547 self.subprocess_was_initiated = False 548 self.subprocess.join(self.settings.subprocess_invalidation_timeout) 549 550 def send_data_to_subprocess(self, input_data, block: bool = True): 551 """ 552 If (Transport.pipe == self.settings.transport): Very large buffers (approximately 32 MB+, though it depends on 553 the OS) may raise a ValueError exception 554 :param input_data: 555 :return: 556 """ 557 if not self.subprocess_was_initiated: 558 raise SubprocessIsNotInitiatedError 559 560 self.wait_for_subprocess_readines(block=block) 561 562 data = (True, (input_data, None)) 563 data = self._encode_sendable_data(data) 564 # data = marshal.dumps(data) 565 need_to_retry = True 566 while need_to_retry: 567 subprocess_disconnected_or_terminated: bool = False 568 try: 569 output_to_subprocess_queue = None 570 if Transport.pipe == self.settings.transport: 571 output_to_subprocess_queue = self.queue_to_subprocess[0] 572 elif Transport.queue == self.settings.transport: 573 output_to_subprocess_queue = self.queue_to_subprocess 574 575 if Transport.pipe == self.settings.transport: 576 output_to_subprocess_queue.send_bytes(data) 577 elif Transport.queue == self.settings.transport: 578 output_to_subprocess_queue.put(data, timeout=self.settings.subprocess_writing_timeout) 579 580 need_to_retry = False 581 except OSError: 582 subprocess_disconnected_or_terminated = True 583 except ValueError: 584 subprocess_disconnected_or_terminated = True 585 except Full: 586 need_to_retry = block 587 588 if subprocess_disconnected_or_terminated: 589 self._invalidate() 590 raise SubprocessTerminatedError 591 592 def is_input_queue_is_empty(self): 593 if not self.subprocess_was_initiated: 594 raise SubprocessIsNotInitiatedError 595 596 self.wait_for_subprocess_readines(block=False) 597 598 result = None 599 subprocess_disconnected_or_terminated: bool = False 600 try: 601 output_to_subprocess_queue = None 602 if Transport.pipe == self.settings.transport: 603 output_to_subprocess_queue = self.queue_to_subprocess[0] 604 elif Transport.queue == self.settings.transport: 605 output_to_subprocess_queue = self.queue_to_subprocess 606 607 if Transport.pipe == self.settings.transport: 608 result = not output_to_subprocess_queue.poll(timeout=0.0) 609 elif Transport.queue == self.settings.transport: 610 result = output_to_subprocess_queue.empty() 611 612 # result = self.queue_to_subprocess.empty() 613 except OSError: 614 subprocess_disconnected_or_terminated = True 615 except ValueError: 616 subprocess_disconnected_or_terminated = True 617 618 if subprocess_disconnected_or_terminated: 619 self._invalidate() 620 raise SubprocessTerminatedError 621 622 return result 623 624 def wait_for_data(self, timeout: Optional[Union[float, int]] = None): 625 start_time = perf_counter() 626 while not self.is_input_queue_is_empty(): 627 if timeout is not None: 628 if (perf_counter() - start_time) >= timeout: 629 break 630 631 def get_answer_from_subprocess(self, block=True): 632 """ 633 If (Transport.pipe == self.settings.transport): Very large buffers (approximately 32 MB+, though it depends on 634 the OS) may raise a ValueError exception 635 Will raise Empty() in non-blocking mode when queue is empty 636 :param block: 637 :param time_out: None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds 638 :return: 639 """ 640 if not self.subprocess_was_initiated: 641 raise SubprocessIsNotInitiatedError 642 643 self.wait_for_subprocess_readines(block=block) 644 645 subprocess_continue_working = None 646 answer = None 647 subprocess_disconnected_or_terminated: bool = False 648 try: 649 if Transport.pipe == self.settings.transport: 650 input_from_subprocess_queue = self.queue_from_subprocess[0] 651 if block: 652 subprocess_answer = input_from_subprocess_queue.recv_bytes() 653 subprocess_answer = self._decode_sendable_data(subprocess_answer) 654 # subprocess_answer = marshal.loads(subprocess_answer) 655 subprocess_continue_working, answer = subprocess_answer 656 else: 657 if input_from_subprocess_queue.poll(timeout=0.0): 658 subprocess_answer = input_from_subprocess_queue.recv_bytes() 659 subprocess_answer = self._decode_sendable_data(subprocess_answer) 660 # subprocess_answer = marshal.loads(subprocess_answer) 661 subprocess_continue_working, answer = subprocess_answer 662 else: 663 raise Empty() 664 elif Transport.queue == self.settings.transport: 665 input_from_subprocess_queue = self.queue_from_subprocess 666 subprocess_answer = input_from_subprocess_queue.get(block=block, timeout=self.settings.subprocess_reading_timeout) 667 subprocess_answer = self._decode_sendable_data(subprocess_answer) 668 # subprocess_answer = marshal.loads(subprocess_answer) 669 subprocess_continue_working, answer = subprocess_answer 670 671 if not subprocess_continue_working: 672 subprocess_disconnected_or_terminated = True 673 except OSError: 674 subprocess_disconnected_or_terminated = True 675 except ValueError: 676 subprocess_disconnected_or_terminated = True 677 678 if subprocess_disconnected_or_terminated: 679 self._invalidate() 680 raise SubprocessTerminatedError 681 682 exception = answer[1] 683 result = answer[0] 684 if exception is not None: 685 exception = pickle.loads(exception) 686 # print(self.settings.process_name) 687 # print(result) 688 # print(exception) 689 # print() 690 # print(' <<< SUBPROCESS EXCEPTION:') 691 # trace = '' 692 # for line in exception[2]: 693 # trace += line 694 # print(trace, file=sys.stderr) 695 # print(exception[0]) 696 # print(exception[1]) 697 # print(' >>>') 698 699 exc_type, exc_value, exc_tb = exception 700 raise exc_value.with_traceback(exc_tb) 701 702 return result
137 def __init__(self, settings: SubprocessWorkerSettings): 138 """ 139 :param settings: SubprocessWorkerSettings(); you should use copy.copy(SubprocessWorkerSettings(...)) by your 140 self if you want 141 :return: 142 """ 143 super().__init__() 144 145 self.settings: SubprocessWorkerSettings = settings 146 self.settings.check() 147 148 self.data_shelf = None 149 150 self.subprocess_was_initiated = False 151 self.subprocess = None 152 self.queue_to_subprocess = self.settings.queue_to_subprocess 153 self.queue_from_subprocess = self.settings.queue_from_subprocess 154 155 self.list_of_subprocess_input_data: List = list() 156 self.input_size_print_sum = 0 157 self.input_size_print_counter = 0 158 self.input_size_print_counter_limit = 500 159 # self.last_log_print_time = time.time() 160 self.subprocess_readyness_indicated: bool = False 161 self.input_queue_average_size_trigger = FrontTriggerableVariable( 162 FrontTriggerableVariableType.bigger_or_equal, self.settings.input_queue_average_size_trigger_limit)
:param settings: SubprocessWorkerSettings(); you should use copy.copy(SubprocessWorkerSettings(...)) by your self if you want :return:
451 def start(self, wait_for_process_readyness: bool = True): 452 if not self.subprocess_was_initiated: 453 if self.queue_to_subprocess is None: 454 if Transport.pipe == self.settings.transport: 455 self.queue_to_subprocess = Pipe() 456 elif Transport.queue == self.settings.transport: 457 self.queue_to_subprocess = Queue() 458 459 if self.queue_from_subprocess is None: 460 if Transport.pipe == self.settings.transport: 461 self.queue_from_subprocess = Pipe() 462 elif Transport.queue == self.settings.transport: 463 self.queue_from_subprocess = Queue() 464 465 target = None 466 arguments = None 467 if self.settings.profile: 468 target = _subprocess_wrapper_profile 469 arguments = (self,) 470 else: 471 target = self._subprocess_wrapper 472 arguments = tuple() 473 self.subprocess = None 474 if self.settings.need_multithreading: 475 self.subprocess = Thread(target=target, args=arguments, daemon=True) 476 else: 477 self.subprocess = Process(target=target, args=arguments, daemon=True) 478 479 self.subprocess.start() 480 self.subprocess_was_initiated = True 481 482 if wait_for_process_readyness: 483 self.wait_for_subprocess_readines(block=True)
485 def wait_for_subprocess_readines(self, block: bool = True): 486 if not self.subprocess_was_initiated: 487 raise SubprocessIsNotInitiatedError 488 489 if self.subprocess_readyness_indicated: 490 return 491 492 if self.settings.indicate_subprocess_readyness: 493 self.subprocess_readyness_indicated = True 494 try: 495 try: 496 self.get_answer_from_subprocess(block=block) 497 self.subprocess_readyness_indicated = True 498 except Empty: 499 raise SubprocessIsNotReadyError 500 except: 501 self.subprocess_readyness_indicated = False 502 raise
513 def stop(self): 514 if not self.subprocess_was_initiated: 515 return 516 517 self.wait_for_subprocess_readines(block=True) 518 519 data = (False, (None, None)) 520 data = self._encode_sendable_data(data) 521 # data = marshal.dumps(data) 522 523 output_to_subprocess_queue = None 524 if Transport.pipe == self.settings.transport: 525 output_to_subprocess_queue = self.queue_to_subprocess[0] 526 elif Transport.queue == self.settings.transport: 527 output_to_subprocess_queue = self.queue_to_subprocess 528 529 if Transport.pipe == self.settings.transport: 530 output_to_subprocess_queue.send_bytes(data) 531 elif Transport.queue == self.settings.transport: 532 output_to_subprocess_queue.put(data, timeout=self.settings.subprocess_writing_timeout) 533 534 try: 535 self.get_answer_from_subprocess(block=True) 536 except Empty: 537 pass 538 except SubprocessTerminatedError: 539 pass 540 finally: 541 self._close_connections() 542 self.subprocess_was_initiated = False 543 self.subprocess.join()
550 def send_data_to_subprocess(self, input_data, block: bool = True): 551 """ 552 If (Transport.pipe == self.settings.transport): Very large buffers (approximately 32 MB+, though it depends on 553 the OS) may raise a ValueError exception 554 :param input_data: 555 :return: 556 """ 557 if not self.subprocess_was_initiated: 558 raise SubprocessIsNotInitiatedError 559 560 self.wait_for_subprocess_readines(block=block) 561 562 data = (True, (input_data, None)) 563 data = self._encode_sendable_data(data) 564 # data = marshal.dumps(data) 565 need_to_retry = True 566 while need_to_retry: 567 subprocess_disconnected_or_terminated: bool = False 568 try: 569 output_to_subprocess_queue = None 570 if Transport.pipe == self.settings.transport: 571 output_to_subprocess_queue = self.queue_to_subprocess[0] 572 elif Transport.queue == self.settings.transport: 573 output_to_subprocess_queue = self.queue_to_subprocess 574 575 if Transport.pipe == self.settings.transport: 576 output_to_subprocess_queue.send_bytes(data) 577 elif Transport.queue == self.settings.transport: 578 output_to_subprocess_queue.put(data, timeout=self.settings.subprocess_writing_timeout) 579 580 need_to_retry = False 581 except OSError: 582 subprocess_disconnected_or_terminated = True 583 except ValueError: 584 subprocess_disconnected_or_terminated = True 585 except Full: 586 need_to_retry = block 587 588 if subprocess_disconnected_or_terminated: 589 self._invalidate() 590 raise SubprocessTerminatedError
If (Transport.pipe == self.settings.transport): Very large buffers (approximately 32 MB+, though it depends on the OS) may raise a ValueError exception :param input_data: :return:
592 def is_input_queue_is_empty(self): 593 if not self.subprocess_was_initiated: 594 raise SubprocessIsNotInitiatedError 595 596 self.wait_for_subprocess_readines(block=False) 597 598 result = None 599 subprocess_disconnected_or_terminated: bool = False 600 try: 601 output_to_subprocess_queue = None 602 if Transport.pipe == self.settings.transport: 603 output_to_subprocess_queue = self.queue_to_subprocess[0] 604 elif Transport.queue == self.settings.transport: 605 output_to_subprocess_queue = self.queue_to_subprocess 606 607 if Transport.pipe == self.settings.transport: 608 result = not output_to_subprocess_queue.poll(timeout=0.0) 609 elif Transport.queue == self.settings.transport: 610 result = output_to_subprocess_queue.empty() 611 612 # result = self.queue_to_subprocess.empty() 613 except OSError: 614 subprocess_disconnected_or_terminated = True 615 except ValueError: 616 subprocess_disconnected_or_terminated = True 617 618 if subprocess_disconnected_or_terminated: 619 self._invalidate() 620 raise SubprocessTerminatedError 621 622 return result
631 def get_answer_from_subprocess(self, block=True): 632 """ 633 If (Transport.pipe == self.settings.transport): Very large buffers (approximately 32 MB+, though it depends on 634 the OS) may raise a ValueError exception 635 Will raise Empty() in non-blocking mode when queue is empty 636 :param block: 637 :param time_out: None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds 638 :return: 639 """ 640 if not self.subprocess_was_initiated: 641 raise SubprocessIsNotInitiatedError 642 643 self.wait_for_subprocess_readines(block=block) 644 645 subprocess_continue_working = None 646 answer = None 647 subprocess_disconnected_or_terminated: bool = False 648 try: 649 if Transport.pipe == self.settings.transport: 650 input_from_subprocess_queue = self.queue_from_subprocess[0] 651 if block: 652 subprocess_answer = input_from_subprocess_queue.recv_bytes() 653 subprocess_answer = self._decode_sendable_data(subprocess_answer) 654 # subprocess_answer = marshal.loads(subprocess_answer) 655 subprocess_continue_working, answer = subprocess_answer 656 else: 657 if input_from_subprocess_queue.poll(timeout=0.0): 658 subprocess_answer = input_from_subprocess_queue.recv_bytes() 659 subprocess_answer = self._decode_sendable_data(subprocess_answer) 660 # subprocess_answer = marshal.loads(subprocess_answer) 661 subprocess_continue_working, answer = subprocess_answer 662 else: 663 raise Empty() 664 elif Transport.queue == self.settings.transport: 665 input_from_subprocess_queue = self.queue_from_subprocess 666 subprocess_answer = input_from_subprocess_queue.get(block=block, timeout=self.settings.subprocess_reading_timeout) 667 subprocess_answer = self._decode_sendable_data(subprocess_answer) 668 # subprocess_answer = marshal.loads(subprocess_answer) 669 subprocess_continue_working, answer = subprocess_answer 670 671 if not subprocess_continue_working: 672 subprocess_disconnected_or_terminated = True 673 except OSError: 674 subprocess_disconnected_or_terminated = True 675 except ValueError: 676 subprocess_disconnected_or_terminated = True 677 678 if subprocess_disconnected_or_terminated: 679 self._invalidate() 680 raise SubprocessTerminatedError 681 682 exception = answer[1] 683 result = answer[0] 684 if exception is not None: 685 exception = pickle.loads(exception) 686 # print(self.settings.process_name) 687 # print(result) 688 # print(exception) 689 # print() 690 # print(' <<< SUBPROCESS EXCEPTION:') 691 # trace = '' 692 # for line in exception[2]: 693 # trace += line 694 # print(trace, file=sys.stderr) 695 # print(exception[0]) 696 # print(exception[1]) 697 # print(' >>>') 698 699 exc_type, exc_value, exc_tb = exception 700 raise exc_value.with_traceback(exc_tb) 701 702 return result
If (Transport.pipe == self.settings.transport): Very large buffers (approximately 32 MB+, though it depends on the OS) may raise a ValueError exception Will raise Empty() in non-blocking mode when queue is empty :param block: :param time_out: None - infinite; 0.0 - nonblocking; > 0.0 - timeout in seconds :return: